Repository: hbase Updated Branches: refs/heads/0.98 81a6fffb3 -> 5716dda86
HBASE-15245 Port HBASE-15177 (Reduce garbage created under high load) to 0.98 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5716dda8 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5716dda8 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5716dda8 Branch: refs/heads/0.98 Commit: 5716dda86b81e6d208bd70fc4ad4749cfd861414 Parents: 81a6fff Author: Andrew Purtell <[email protected]> Authored: Thu Feb 11 12:23:59 2016 -0800 Committer: Andrew Purtell <[email protected]> Committed: Thu Feb 11 15:36:17 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ScannerCallable.java | 10 +- .../org/apache/hadoop/hbase/ipc/IPCUtil.java | 17 ++- .../hbase/ipc/PayloadCarryingRpcController.java | 5 +- .../org/apache/hadoop/hbase/ipc/RpcClient.java | 4 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 19 +++- .../apache/hadoop/hbase/ipc/TestIPCUtil.java | 2 +- .../hadoop/hbase/io/ByteBufferInputStream.java | 107 +++++++++++++++++++ .../hadoop/hbase/util/ByteBufferUtils.java | 85 ++++++++++++++- .../apache/hadoop/hbase/util/UnsafeAccess.java | 100 ++++++++++++++++- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 41 +++---- .../AnnotationReadingPriorityFunction.java | 8 +- .../hadoop/hbase/regionserver/HRegion.java | 5 +- .../hbase/regionserver/HRegionServer.java | 15 ++- 13 files changed, 365 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 8d1c20d..c11160a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -83,7 +83,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { // indicate if it is a remote server call protected boolean isRegionServerRemote = true; private long nextCallSeq = 0; - protected final PayloadCarryingRpcController controller; + protected PayloadCarryingRpcController controller; /** * @param connection which connection @@ -155,6 +155,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { */ @SuppressWarnings("deprecation") public Result [] call() throws IOException { + if (controller == null) { + controller = RpcControllerFactory.instantiate(connection.getConfiguration()) + .newController(); + } if (closed) { if (scannerId != -1) { close(); @@ -300,7 +304,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { ScanRequest request = RequestConverter.buildScanRequest(this.scannerId, 0, true); try { - getStub().scan(null, request); + getStub().scan(controller, request); } catch (ServiceException se) { throw ProtobufUtil.getRemoteException(se); } @@ -317,7 +321,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { getLocation().getRegionInfo().getRegionName(), this.scan, 0, false); try { - ScanResponse response = getStub().scan(null, request); + ScanResponse response = getStub().scan(controller, request); long id = response.getScannerId(); if (logScannerActivity) { LOG.info("Open scanner=" + id + " for scan=" + scan.toString() http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java index f143203..c4481d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; @@ -180,19 +181,18 @@ class IPCUtil { CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, final byte [] cellBlock) throws IOException { - return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); + return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock)); } /** * @param codec - * @param cellBlock - * @param offset - * @param length + * @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be + * position()'ed at the start of the cell block and limit()'ed at the end. * @return CellScanner to work against the content of <code>cellBlock</code> * @throws IOException */ CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, - final byte [] cellBlock, final int offset, final int length) + final ByteBuffer cellBlock) throws IOException { // If compressed, decompress it first before passing it on else we will leak compression // resources if the stream is not closed properly after we let it out. @@ -202,13 +202,12 @@ class IPCUtil { if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf); Decompressor poolDecompressor = CodecPool.getDecompressor(compressor); CompressionInputStream cis = - compressor.createInputStream(new ByteArrayInputStream(cellBlock, offset, length), - poolDecompressor); + compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor); ByteBufferOutputStream bbos = null; try { // TODO: This is ugly. The buffer will be resized on us if we guess wrong. // TODO: Reuse buffers. - bbos = new ByteBufferOutputStream((length - offset) * + bbos = new ByteBufferOutputStream(cellBlock.remaining() * this.cellBlockDecompressionMultiplier); IOUtils.copy(cis, bbos); bbos.close(); @@ -221,7 +220,7 @@ class IPCUtil { CodecPool.returnDecompressor(poolDecompressor); } } else { - is = new ByteArrayInputStream(cellBlock, offset, length); + is = new ByteBufferInputStream(cellBlock); } return codec.getDecoder(is); } http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java index 6b3f1e8..2dfe6b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/PayloadCarryingRpcController.java @@ -37,6 +37,8 @@ import com.google.protobuf.RpcController; */ @InterfaceAudience.Private public class PayloadCarryingRpcController implements RpcController, CellScannable { + public static final int PRIORITY_UNSET = -1; + /** * Priority to set on this request. Set it here in controller so available composing the * request. This is the ordained way of setting priorities going forward. We will be @@ -44,7 +46,7 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl */ // Currently only multi call makes use of this. Eventually this should be only way to set // priority. - private int priority = 0; + private int priority = PRIORITY_UNSET; // TODO: Fill out the rest of this class methods rather than return UnsupportedOperationException @@ -71,6 +73,7 @@ public class PayloadCarryingRpcController implements RpcController, CellScannabl /** * @return One-shot cell scanner (you cannot back it up and restart) */ + @Override public CellScanner cellScanner() { return cellScanner; } http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 7f11038..82dd1d1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -1048,7 +1048,9 @@ public class RpcClient { builder.setCellBlockMeta(cellBlockBuilder.build()); } // Only pass priority if there one. Let zero be same as no priority. - if (priority != 0) builder.setPriority(priority); + if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) { + builder.setPriority(priority); + } //noinspection SynchronizeOnNonFinalField RequestHeader header = builder.build(); synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index cee0ace..26c96e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -2424,13 +2424,13 @@ public final class ProtobufUtil { */ public static String getRegionEncodedName( final RegionSpecifier regionSpecifier) throws DoNotRetryIOException { - byte[] value = regionSpecifier.getValue().toByteArray(); + ByteString value = regionSpecifier.getValue(); RegionSpecifierType type = regionSpecifier.getType(); switch (type) { case REGION_NAME: - return HRegionInfo.encodeRegionName(value); + return HRegionInfo.encodeRegionName(value.toByteArray()); case ENCODED_REGION_NAME: - return Bytes.toString(value); + return value.toStringUtf8(); default: throw new DoNotRetryIOException( "Unsupported region specifier type: " + type); @@ -2905,6 +2905,19 @@ public final class ProtobufUtil { codedInput.checkLastTagWas(0); } + public static void mergeFrom(Message.Builder builder, CodedInputStream codedInput, int length) + throws IOException { + codedInput.resetSizeCounter(); + int prevLimit = codedInput.setSizeLimit(length); + + int limit = codedInput.pushLimit(length); + builder.mergeFrom(codedInput); + codedInput.popLimit(limit); + + codedInput.checkLastTagWas(0); + codedInput.setSizeLimit(prevLimit); + } + public static ReplicationLoadSink toReplicationLoadSink( ClusterStatusProtos.ReplicationLoadSink cls) { return new ReplicationLoadSink(cls.getAgeOfLastAppliedOp(), cls.getTimeStampsOfLastAppliedOp()); http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java index 366d13c..35c06a1 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java @@ -78,7 +78,7 @@ public class TestIPCUtil { CellScanner cellScanner = sized? getSizedCellScanner(cells): CellUtil.createCellScanner(Arrays.asList(cells).iterator()); ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner); - cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit()); + cellScanner = util.createCellScanner(codec, compressor, bb); int i = 0; while (cellScanner.advance()) { i++; http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java new file mode 100644 index 0000000..8aee07b --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; + +/** + * Not thread safe! + * <p> + * Please note that the reads will cause position movement on wrapped ByteBuffer. + */ [email protected] +public class ByteBufferInputStream extends InputStream { + + private ByteBuffer buf; + + public ByteBufferInputStream(ByteBuffer buf) { + this.buf = buf; + } + + /** + * Reads the next byte of data from this input stream. The value byte is returned as an + * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available + * because the end of the stream has been reached, the value <code>-1</code> is returned. + * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached. + */ + @Override + public int read() { + if (this.buf.hasRemaining()) { + return (this.buf.get() & 0xff); + } + return -1; + } + + /** + * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from + * given offset). + * @param b the array into which the data is read. + * @param off the start offset in the destination array <code>b</code> + * @param len the maximum number of bytes to read. + * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even + * 1 byte can be read because the end of the stream has been reached. + */ + @Override + public int read(byte[] b, int off, int len) { + int avail = available(); + if (avail <= 0) { + return -1; + } + + if (len > avail) { + len = avail; + } + if (len <= 0) { + return 0; + } + + ByteBufferUtils.copyFromBufferToArray(b, this.buf, this.buf.position(), off, len); + this.buf.position(this.buf.position() + len); // we should advance the buffer position + return len; + } + + /** + * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the + * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is + * equal to the smaller of <code>n</code> and remaining bytes in the stream. + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + */ + @Override + public long skip(long n) { + long k = Math.min(n, available()); + if (k < 0) { + k = 0; + } + this.buf.position((int) (this.buf.position() + k)); + return k; + } + + /** + * @return the number of remaining bytes that can be read (or skipped + * over) from this input stream. + */ + @Override + public int available() { + return this.buf.remaining(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index b4c6690..bb8f766 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -17,7 +17,6 @@ package org.apache.hadoop.hbase.util; import java.io.ByteArrayOutputStream; -import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; @@ -33,6 +32,7 @@ import org.apache.hadoop.io.WritableUtils; * Utility functions for working with byte buffers, such as reading/writing * variable-length long numbers. */ +@SuppressWarnings("restriction") @InterfaceAudience.Public @InterfaceStability.Evolving public final class ByteBufferUtils { @@ -41,6 +41,8 @@ public final class ByteBufferUtils { private final static int VALUE_MASK = 0x7f; private final static int NEXT_BIT_SHIFT = 7; private final static int NEXT_BIT_MASK = 1 << 7; + private static final boolean UNSAFE_AVAIL = UnsafeAccess.isAvailable(); + private static final boolean UNSAFE_UNALIGNED = UnsafeAccess.unaligned(); private ByteBufferUtils() { } @@ -331,7 +333,10 @@ public final class ByteBufferUtils { } /** - * Copy from one buffer to another from given offset + * Copy from one buffer to another from given offset. + * <p> + * Note : This will advance the position marker of {@code out} but not change the position maker + * for {@code in} * @param out destination buffer * @param in source buffer * @param sourceOffset offset in the source buffer @@ -352,6 +357,27 @@ public final class ByteBufferUtils { } /** + * Copy from one buffer to another from given offset. This will be absolute positional copying and + * won't affect the position of any of the buffers. + * @param out + * @param in + * @param sourceOffset + * @param destinationOffset + * @param length + */ + public static void copyFromBufferToBuffer(ByteBuffer out, ByteBuffer in, int sourceOffset, + int destinationOffset, int length) { + if (in.hasArray() && out.hasArray()) { + System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.arrayOffset() + + destinationOffset, length); + } else { + for (int i = 0; i < length; ++i) { + out.put((destinationOffset + i), in.get(sourceOffset + i)); + } + } + } + + /** * Find length of common prefix of two parts in the buffer * @param buffer Where parts are located. * @param offsetLeft Offset of the first part. @@ -454,4 +480,59 @@ public final class ByteBufferUtils { return output; } + /** + * Copy the given number of bytes from specified offset into a new byte[] + * @param buffer + * @param offset + * @param length + * @return a new byte[] containing the bytes in the specified range + */ + public static byte[] toBytes(ByteBuffer buffer, int offset, int length) { + byte[] output = new byte[length]; + for (int i = 0; i < length; i++) { + output[i] = buffer.get(offset + i); + } + return output; + } + + public static int compareTo(ByteBuffer buf1, int o1, int len1, ByteBuffer buf2, int o2, + int len2) { + if (buf1.hasArray() && buf2.hasArray()) { + return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(), + buf2.arrayOffset() + o2, len2); + } + int end1 = o1 + len1; + int end2 = o2 + len2; + for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) { + int a = buf1.get(i) & 0xFF; + int b = buf2.get(j) & 0xFF; + if (a != b) { + return a - b; + } + } + return len1 - len2; + } + + /** + * Copies specified number of bytes from given offset of 'in' ByteBuffer to + * the array. + * @param out + * @param in + * @param sourceOffset + * @param destinationOffset + * @param length + */ + public static void copyFromBufferToArray(byte[] out, ByteBuffer in, int sourceOffset, + int destinationOffset, int length) { + if (in.hasArray()) { + System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out, destinationOffset, length); + } else if (UNSAFE_AVAIL) { + UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length); + } else { + int oldPos = in.position(); + in.position(sourceOffset); + in.get(out, destinationOffset, length); + in.position(oldPos); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java index 2e86b4d..fabdce8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hbase.util; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.nio.ByteOrder; +import java.nio.ByteBuffer; import java.security.AccessController; import java.security.PrivilegedAction; @@ -29,19 +29,26 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import sun.misc.Unsafe; +import sun.nio.ch.DirectBuffer; @InterfaceAudience.Private @InterfaceStability.Evolving [email protected](value="REC_CATCH_EXCEPTION", + justification="If exception, presume unaligned") public final class UnsafeAccess { private static final Log LOG = LogFactory.getLog(UnsafeAccess.class); public static final Unsafe theUnsafe; + private static boolean unaligned = false; /** The offset to the first element in a byte array. */ public static final int BYTE_ARRAY_BASE_OFFSET; - private static boolean unaligned = false; + // This number limits the number of bytes to copy per call to Unsafe's + // copyMemory method. A limit is imposed to allow for safepoint polling + // during a large copy + static final long UNSAFE_COPY_THRESHOLD = 1024L * 1024L; static { theUnsafe = (Unsafe) AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override @@ -66,7 +73,7 @@ public final class UnsafeAccess { m.setAccessible(true); unaligned = (Boolean) m.invoke(null); } catch (Exception e) { - unaligned = false; + unaligned = false; // FindBugs: Causes REC_CATCH_EXCEPTION. Suppressed. } } else{ BYTE_ARRAY_BASE_OFFSET = -1; @@ -87,6 +94,89 @@ public final class UnsafeAccess { return unaligned; } - public static final boolean littleEndian = ByteOrder.nativeOrder() - .equals(ByteOrder.LITTLE_ENDIAN); + + // APIs to copy data. This will be direct memory location copy and will be much faster + /** + * Copies the bytes from given array's offset to length part into the given buffer. + * @param src + * @param srcOffset + * @param dest + * @param destOffset + * @param length + */ + public static void copy(byte[] src, int srcOffset, ByteBuffer dest, int destOffset, int length) { + long destAddress = destOffset; + Object destBase = null; + if (dest.isDirect()) { + destAddress = destAddress + ((DirectBuffer) dest).address(); + } else { + destAddress = destAddress + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset(); + destBase = dest.array(); + } + long srcAddress = srcOffset + BYTE_ARRAY_BASE_OFFSET; + unsafeCopy(src, srcAddress, destBase, destAddress, length); + } + + private static void unsafeCopy(Object src, long srcAddr, Object dst, long destAddr, long len) { + while (len > 0) { + long size = (len > UNSAFE_COPY_THRESHOLD) ? UNSAFE_COPY_THRESHOLD : len; + theUnsafe.copyMemory(src, srcAddr, dst, destAddr, len); + len -= size; + srcAddr += size; + destAddr += size; + } + } + + /** + * Copies specified number of bytes from given offset of {@code src} ByteBuffer to the + * {@code dest} array. + * + * @param src + * @param srcOffset + * @param dest + * @param destOffset + * @param length + */ + public static void copy(ByteBuffer src, int srcOffset, byte[] dest, int destOffset, + int length) { + long srcAddress = srcOffset; + Object srcBase = null; + if (src.isDirect()) { + srcAddress = srcAddress + ((DirectBuffer) src).address(); + } else { + srcAddress = srcAddress + BYTE_ARRAY_BASE_OFFSET + src.arrayOffset(); + srcBase = src.array(); + } + long destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET; + unsafeCopy(srcBase, srcAddress, dest, destAddress, length); + } + + /** + * Copies specified number of bytes from given offset of {@code src} buffer into the {@code dest} + * buffer. + * + * @param src + * @param srcOffset + * @param dest + * @param destOffset + * @param length + */ + public static void copy(ByteBuffer src, int srcOffset, ByteBuffer dest, int destOffset, + int length) { + long srcAddress, destAddress; + Object srcBase = null, destBase = null; + if (src.isDirect()) { + srcAddress = srcOffset + ((DirectBuffer) src).address(); + } else { + srcAddress = srcOffset + src.arrayOffset() + BYTE_ARRAY_BASE_OFFSET; + srcBase = src.array(); + } + if (dest.isDirect()) { + destAddress = destOffset + ((DirectBuffer) dest).address(); + } else { + destAddress = destOffset + BYTE_ARRAY_BASE_OFFSET + dest.arrayOffset(); + destBase = dest.array(); + } + unsafeCopy(srcBase, srcAddress, destBase, destAddress, length); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 76b426b..49eb883 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Operation; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; import org.apache.hadoop.hbase.io.BoundedByteBufferPool; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; @@ -1366,17 +1367,18 @@ public class RpcServer implements RpcServerInterface { return authorizedUgi; } - private void saslReadAndProcess(byte[] saslToken) throws IOException, + private void saslReadAndProcess(ByteBuffer saslToken) throws IOException, InterruptedException { if (saslContextEstablished) { if (LOG.isDebugEnabled()) - LOG.debug("Have read input token of size " + saslToken.length + LOG.debug("Have read input token of size " + saslToken.limit() + " for processing by saslServer.unwrap()"); if (!useWrap) { processOneRpc(saslToken); } else { - byte [] plaintextData = saslServer.unwrap(saslToken, 0, saslToken.length); + byte[] b = saslToken.array(); + byte [] plaintextData = saslServer.unwrap(b, saslToken.position(), saslToken.limit()); processUnwrappedData(plaintextData); } } else { @@ -1426,10 +1428,10 @@ public class RpcServer implements RpcServerInterface { } } if (LOG.isDebugEnabled()) { - LOG.debug("Have read input token of size " + saslToken.length + LOG.debug("Have read input token of size " + saslToken.limit() + " for processing by saslServer.evaluateResponse()"); } - replyToken = saslServer.evaluateResponse(saslToken); + replyToken = saslServer.evaluateResponse(saslToken.array()); } catch (IOException e) { IOException sendToClient = e; Throwable cause = e; @@ -1605,6 +1607,7 @@ public class RpcServer implements RpcServerInterface { throw new IllegalArgumentException("Unexpected data length " + dataLength + "!! from " + getHostAddress()); } + // TODO: check dataLength against some limit so that the client cannot OOM the server data = ByteBuffer.allocate(dataLength); incRpcCount(); // Increment the rpc count } @@ -1621,9 +1624,9 @@ public class RpcServer implements RpcServerInterface { } boolean headerRead = connectionHeaderRead; if (useSasl) { - saslReadAndProcess(data.array()); + saslReadAndProcess(data); } else { - processOneRpc(data.array()); + processOneRpc(data); } this.data = null; if (!headerRead) { @@ -1658,8 +1661,8 @@ public class RpcServer implements RpcServerInterface { } // Reads the connection header following version - private void processConnectionHeader(byte[] buf) throws IOException { - this.connectionHeader = ConnectionHeader.parseFrom(buf); + private void processConnectionHeader(ByteBuffer buf) throws IOException { + this.connectionHeader = ConnectionHeader.parseFrom(new ByteBufferInputStream(buf)); String serviceName = connectionHeader.getServiceName(); if (serviceName == null) throw new EmptyServiceNameException(); this.service = getService(services, serviceName); @@ -1769,13 +1772,13 @@ public class RpcServer implements RpcServerInterface { if (unwrappedData.remaining() == 0) { unwrappedDataLengthBuffer.clear(); unwrappedData.flip(); - processOneRpc(unwrappedData.array()); + processOneRpc(unwrappedData); unwrappedData = null; } } } - private void processOneRpc(byte[] buf) throws IOException, InterruptedException { + private void processOneRpc(ByteBuffer buf) throws IOException, InterruptedException { if (connectionHeaderRead) { processRequest(buf); } else { @@ -1797,16 +1800,16 @@ public class RpcServer implements RpcServerInterface { * @throws IOException * @throws InterruptedException */ - protected void processRequest(byte[] buf) throws IOException, InterruptedException { - long totalRequestSize = buf.length; + protected void processRequest(ByteBuffer buf) throws IOException, InterruptedException { + long totalRequestSize = buf.limit(); int offset = 0; // Here we read in the header. We avoid having pb // do its default 4k allocation for CodedInputStream. We force it to use backing array. - CodedInputStream cis = CodedInputStream.newInstance(buf, offset, buf.length); + CodedInputStream cis = CodedInputStream.newInstance(buf.array(), offset, buf.limit()); int headerSize = cis.readRawVarint32(); offset = cis.getTotalBytesRead(); Message.Builder builder = RequestHeader.newBuilder(); - ProtobufUtil.mergeFrom(builder, buf, offset, headerSize); + ProtobufUtil.mergeFrom(builder, cis, headerSize); RequestHeader header = (RequestHeader) builder.build(); offset += headerSize; int id = header.getCallId(); @@ -1838,18 +1841,18 @@ public class RpcServer implements RpcServerInterface { if (md == null) throw new UnsupportedOperationException(header.getMethodName()); builder = this.service.getRequestPrototype(md).newBuilderForType(); // To read the varint, I need an inputstream; might as well be a CIS. - cis = CodedInputStream.newInstance(buf, offset, buf.length); + cis.resetSizeCounter(); int paramSize = cis.readRawVarint32(); offset += cis.getTotalBytesRead(); if (builder != null) { - ProtobufUtil.mergeFrom(builder, buf, offset, paramSize); + ProtobufUtil.mergeFrom(builder, cis, paramSize); param = builder.build(); } offset += paramSize; } if (header.hasCellBlockMeta()) { - cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, - buf, offset, buf.length); + buf.position(offset); + cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf); } } catch (Throwable t) { InetSocketAddress address = getListenerAddress(); http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index 16e49f0..dda4240 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoReque import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; @@ -156,10 +155,9 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { if (param == null) { return HConstants.NORMAL_QOS; } - if (param instanceof MultiRequest) { - // The multi call has its priority set in the header. All calls should work this way but - // only this one has been converted so far. No priority == NORMAL_QOS. - return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS; + // Trust the client-set priorities if set + if (header.hasPriority()) { + return header.getPriority(); } String cls = param.getClass().getName(); Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls); http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index fb18d84..8674308 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4065,8 +4065,9 @@ public class HRegion implements HeapSize { // , Writable{ // Here we separate all scanners into two lists - scanner that provide data required // by the filter to operate (scanners list) and all others (joinedScanners list). - List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); - List<KeyValueScanner> joinedScanners = new ArrayList<KeyValueScanner>(); + List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(scan.getFamilyMap().size()); + List<KeyValueScanner> joinedScanners = + new ArrayList<KeyValueScanner>(scan.getFamilyMap().size()); if (additionalScanners != null) { scanners.addAll(additionalScanners); } http://git-wip-us.apache.org/repos/asf/hbase/blob/5716dda8/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8c09416..439bee7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -4492,8 +4492,19 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa */ protected HRegion getRegion( final RegionSpecifier regionSpecifier) throws IOException { - return getRegionByEncodedName(regionSpecifier.getValue().toByteArray(), - ProtobufUtil.getRegionEncodedName(regionSpecifier)); + ByteString value = regionSpecifier.getValue(); + RegionSpecifierType type = regionSpecifier.getType(); + switch (type) { + case REGION_NAME: + byte[] regionName = value.toByteArray(); + String encodedRegionName = HRegionInfo.encodeRegionName(regionName); + return getRegionByEncodedName(regionName, encodedRegionName); + case ENCODED_REGION_NAME: + return getRegionByEncodedName(value.toStringUtf8()); + default: + throw new DoNotRetryIOException( + "Unsupported region specifier type: " + type); + } } /**
