This is an automated email from the ASF dual-hosted git repository. openinx pushed a commit to branch HBASE-21879 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit e66c9b88337bd96c9683588c08169d7ba1323c09 Author: huzheng <[email protected]> AuthorDate: Sat Feb 16 17:16:09 2019 +0800 HBASE-21916 Abstract an ByteBuffAllocator to allocate/free ByteBuffer in ByteBufferPool --- .../apache/hadoop/hbase/ipc/CellBlockBuilder.java | 9 +- hbase-common/pom.xml | 4 + .../apache/hadoop/hbase/io/ByteBuffAllocator.java | 282 +++++++++++++++++++ .../hbase/io/ByteBufferListOutputStream.java | 40 ++- .../org/apache/hadoop/hbase/io/ByteBufferPool.java | 155 ----------- .../hbase/io/encoding/CopyKeyDataBlockEncoder.java | 2 +- .../hbase/io/encoding/DiffKeyDeltaEncoder.java | 2 +- .../hbase/io/encoding/FastDiffDeltaEncoder.java | 2 +- .../hbase/io/encoding/PrefixKeyDeltaEncoder.java | 2 +- .../hadoop/hbase/io/encoding/RowIndexSeekerV1.java | 2 +- .../java/org/apache/hadoop/hbase/nio/ByteBuff.java | 146 +++++----- .../org/apache/hadoop/hbase/nio/MultiByteBuff.java | 98 +++++-- .../java/org/apache/hadoop/hbase/nio/RefCnt.java | 49 ++++ .../apache/hadoop/hbase/nio/SingleByteBuff.java | 92 ++++-- .../apache/hadoop/hbase/util/ByteBufferArray.java | 10 +- .../apache/hadoop/hbase/util/ByteBufferUtils.java | 31 ++- .../hadoop/hbase/io/TestByteBuffAllocator.java | 309 +++++++++++++++++++++ .../hbase/io/TestByteBufferListOutputStream.java | 18 +- .../apache/hadoop/hbase/io/TestByteBufferPool.java | 67 ----- .../apache/hadoop/hbase/nio/TestMultiByteBuff.java | 4 +- .../apache/hadoop/hbase/io/hfile/Cacheable.java | 7 +- .../hadoop/hbase/ipc/NettyRpcFrameDecoder.java | 2 +- .../apache/hadoop/hbase/ipc/NettyRpcServer.java | 2 +- .../apache/hadoop/hbase/ipc/NettyServerCall.java | 12 +- .../hadoop/hbase/ipc/NettyServerRpcConnection.java | 9 +- .../org/apache/hadoop/hbase/ipc/RpcServer.java | 96 +------ .../org/apache/hadoop/hbase/ipc/ServerCall.java | 18 +- .../apache/hadoop/hbase/ipc/SimpleRpcServer.java | 2 +- .../apache/hadoop/hbase/ipc/SimpleServerCall.java | 15 +- .../hbase/ipc/SimpleServerRpcConnection.java | 26 +- .../client/TestAsyncTableGetMultiThreaded.java | 4 +- .../hbase/client/TestServerLoadDurability.java | 8 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 2 +- .../org/apache/hadoop/hbase/ipc/TestRpcServer.java | 144 ---------- 34 files changed, 974 insertions(+), 697 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java index 8d68e87..111f768 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/CellBlockBuilder.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,6 @@ import org.apache.hadoop.hbase.io.ByteBuffInputStream; import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; -import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.ClassSize; @@ -208,7 +208,7 @@ class CellBlockBuilder { * @param codec to use for encoding * @param compressor to use for encoding * @param cellScanner to encode - * @param pool Pool of ByteBuffers to make use of. + * @param allocator to allocate the {@link ByteBuff}. * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has * been flipped and is ready for reading. Use limit to find total size. If @@ -217,15 +217,14 @@ class CellBlockBuilder { * @throws IOException if encoding the cells fail */ public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor, - CellScanner cellScanner, ByteBufferPool pool) throws IOException { + CellScanner cellScanner, ByteBuffAllocator allocator) throws IOException { if (cellScanner == null) { return null; } if (codec == null) { throw new CellScannerButNoCodecException(); } - assert pool != null; - ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool); + ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(allocator); encodeCellsTo(bbos, cellScanner, codec, compressor); if (bbos.size() == 0) { bbos.releaseResources(); diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml index 7d7dea2..c23b9d4 100644 --- a/hbase-common/pom.xml +++ b/hbase-common/pom.xml @@ -151,6 +151,10 @@ <artifactId>hbase-shaded-miscellaneous</artifactId> </dependency> <dependency> + <groupId>org.apache.hbase.thirdparty</groupId> + <artifactId>hbase-shaded-netty</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java new file mode 100644 index 0000000..1833462 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java @@ -0,0 +1,282 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and + * it provide high-level interfaces for upstream. when allocating desired memory size, it will + * return {@link ByteBuff}, if we are sure that those ByteBuffers have reached the end of life + * cycle, we must do the {@link ByteBuff#release()} to return back the buffers to the pool, + * otherwise ByteBuffers leak will happen, and the NIO ByteBuffer pool may be exhausted. there's + * possible that the desired memory size is large than ByteBufferPool has, we'll downgrade to + * allocate ByteBuffers from heap which meaning the GC pressure may increase again. Of course, an + * better way is increasing the ByteBufferPool size if we detected this case. <br/> + * <br/> + * On the other hand, for better memory utilization, we have set an lower bound named + * minSizeForReservoirUse in this allocator, and if the desired size is less than + * minSizeForReservoirUse, the allocator will just allocate the ByteBuffer from heap and let the JVM + * free its memory, because it's too wasting to allocate a single fixed-size ByteBuffer for some + * small objects. <br/> + * <br/> + * We recommend to use this class to allocate/free {@link ByteBuff} in the RPC layer or the entire + * read/write path, because it hide the details of memory management and its APIs are more friendly + * to the upper layer. + */ [email protected] +public class ByteBuffAllocator { + + private static final Logger LOG = LoggerFactory.getLogger(ByteBuffAllocator.class); + + public static final String MAX_BUFFER_COUNT_KEY = "hbase.ipc.server.allocator.max.buffer.count"; + + public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.allocator.buffer.size"; + // 64 KB. Making it same as the chunk size what we will write/read to/from the socket channel. + public static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + + public static final String MIN_ALLOCATE_SIZE_KEY = + "hbase.ipc.server.reservoir.minimal.allocating.size"; + + public static final Recycler NONE = () -> { + }; + + public interface Recycler { + void free(); + } + + private final boolean reservoirEnabled; + private final int bufSize; + private final int maxBufCount; + private final AtomicInteger usedBufCount = new AtomicInteger(0); + + private boolean maxPoolSizeInfoLevelLogged = false; + + // If the desired size is at least this size, it'll allocated from ByteBufferPool, otherwise it'll + // allocated from heap for better utilization. We make this to be 1/6th of the pool buffer size. + private final int minSizeForReservoirUse; + + private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>(); + + /** + * Initialize an {@link ByteBuffAllocator} which will try to allocate ByteBuffers from off-heap if + * reservoir is enabled and the reservoir has enough buffers, otherwise the allocator will just + * allocate the insufficient buffers from on-heap to meet the requirement. + * @param conf which get the arguments to initialize the allocator. + * @param reservoirEnabled indicate whether the reservoir is enabled or disabled. + * @return ByteBuffAllocator to manage the byte buffers. + */ + public static ByteBuffAllocator create(Configuration conf, boolean reservoirEnabled) { + int poolBufSize = conf.getInt(BUFFER_SIZE_KEY, DEFAULT_BUFFER_SIZE); + if (reservoirEnabled) { + // The max number of buffers to be pooled in the ByteBufferPool. The default value been + // selected based on the #handlers configured. When it is read request, 2 MB is the max size + // at which we will send back one RPC request. Means max we need 2 MB for creating the + // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we + // include the heap size overhead of each cells also.) Considering 2 MB, we will need + // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size + // is by default 64 KB. + // In case of read request, at the end of the handler process, we will make the response + // cellblock and add the Call to connection's response Q and a single Responder thread takes + // connections and responses from that one by one and do the socket write. So there is chances + // that by the time a handler originated response is actually done writing to socket and so + // released the BBs it used, the handler might have processed one more read req. On an avg 2x + // we consider and consider that also for the max buffers to pool + int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize; + int maxBuffCount = + conf.getInt(MAX_BUFFER_COUNT_KEY, conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2); + int minSizeForReservoirUse = conf.getInt(MIN_ALLOCATE_SIZE_KEY, poolBufSize / 6); + return new ByteBuffAllocator(true, maxBuffCount, poolBufSize, minSizeForReservoirUse); + } else { + return new ByteBuffAllocator(false, 0, poolBufSize, Integer.MAX_VALUE); + } + } + + /** + * Initialize an {@link ByteBuffAllocator} which only allocate ByteBuffer from on-heap, it's + * designed for testing purpose or disabled reservoir case. + * @return allocator to allocate on-heap ByteBuffer. + */ + public static ByteBuffAllocator createOnHeap() { + return new ByteBuffAllocator(false, 0, DEFAULT_BUFFER_SIZE, Integer.MAX_VALUE); + } + + @VisibleForTesting + ByteBuffAllocator(boolean reservoirEnabled, int maxBufCount, int bufSize, + int minSizeForReservoirUse) { + this.reservoirEnabled = reservoirEnabled; + this.maxBufCount = maxBufCount; + this.bufSize = bufSize; + this.minSizeForReservoirUse = minSizeForReservoirUse; + } + + public boolean isReservoirEnabled() { + return reservoirEnabled; + } + + @VisibleForTesting + public int getQueueSize() { + return this.buffers.size(); + } + + /** + * Allocate an buffer with buffer size from ByteBuffAllocator, Note to call the + * {@link ByteBuff#release()} if no need any more, otherwise the memory leak happen in NIO + * ByteBuffer pool. + * @return an ByteBuff with the buffer size. + */ + public SingleByteBuff allocateOneBuffer() { + if (isReservoirEnabled()) { + ByteBuffer bb = getBuffer(); + if (bb != null) { + return new SingleByteBuff(() -> putbackBuffer(bb), bb); + } + } + // Allocated from heap, let the JVM free its memory. + return new SingleByteBuff(NONE, ByteBuffer.allocate(this.bufSize)); + } + + /** + * Allocate size bytes from the ByteBufAllocator, Note to call the {@link ByteBuff#release()} if + * no need any more, otherwise the memory leak happen in NIO ByteBuffer pool. + * @param size to allocate + * @return an ByteBuff with the desired size. + */ + public ByteBuff allocate(int size) { + if (size < 0) { + throw new IllegalArgumentException("size to allocate should >=0"); + } + // If disabled the reservoir, just allocate it from on-heap. + if (!isReservoirEnabled() || size == 0) { + return new SingleByteBuff(NONE, ByteBuffer.allocate(size)); + } + int reminder = size % bufSize; + int len = size / bufSize + (reminder > 0 ? 1 : 0); + List<ByteBuffer> bbs = new ArrayList<>(len); + // Allocate from ByteBufferPool until the remaining is less than minSizeForReservoirUse or + // reservoir is exhausted. + int remain = size; + while (remain >= minSizeForReservoirUse) { + ByteBuffer bb = this.getBuffer(); + if (bb == null) { + break; + } + bbs.add(bb); + remain -= bufSize; + } + int lenFromReservoir = bbs.size(); + if (remain > 0) { + // If the last ByteBuffer is too small or the reservoir can not provide more ByteBuffers, we + // just allocate the ByteBuffer from on-heap. + bbs.add(ByteBuffer.allocate(remain)); + } + ByteBuff bb = wrap(bbs, () -> { + for (int i = 0; i < lenFromReservoir; i++) { + this.putbackBuffer(bbs.get(i)); + } + }); + bb.limit(size); + return bb; + } + + public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) { + if (buffers == null || buffers.length == 0) { + throw new IllegalArgumentException("buffers shouldn't be null or empty"); + } + return buffers.length == 1 ? new SingleByteBuff(recycler, buffers[0]) + : new MultiByteBuff(recycler, buffers); + } + + public static ByteBuff wrap(ByteBuffer[] buffers) { + return wrap(buffers, NONE); + } + + public static ByteBuff wrap(List<ByteBuffer> buffers, Recycler recycler) { + if (buffers == null || buffers.size() == 0) { + throw new IllegalArgumentException("buffers shouldn't be null or empty"); + } + return buffers.size() == 1 ? new SingleByteBuff(recycler, buffers.get(0)) + : new MultiByteBuff(recycler, buffers.toArray(new ByteBuffer[0])); + } + + public static ByteBuff wrap(List<ByteBuffer> buffers) { + return wrap(buffers, NONE); + } + + /** + * @return One free DirectByteBuffer from the pool. If no free ByteBuffer and we have not reached + * the maximum pool size, it will create a new one and return. In case of max pool size + * also reached, will return null. When pool returned a ByteBuffer, make sure to return it + * back to pool after use. + */ + private ByteBuffer getBuffer() { + ByteBuffer bb = buffers.poll(); + if (bb != null) { + // To reset the limit to capacity and position to 0, must clear here. + bb.clear(); + return bb; + } + while (true) { + int c = this.usedBufCount.intValue(); + if (c >= this.maxBufCount) { + if (!maxPoolSizeInfoLevelLogged) { + LOG.info("Pool already reached its max capacity : {} and no free buffers now. Consider " + + "increasing the value for '{}' ?", + maxBufCount, MAX_BUFFER_COUNT_KEY); + maxPoolSizeInfoLevelLogged = true; + } + return null; + } + if (!this.usedBufCount.compareAndSet(c, c + 1)) { + continue; + } + return ByteBuffer.allocateDirect(bufSize); + } + } + + /** + * Return back a ByteBuffer after its use. Don't read/write the ByteBuffer after the returning. + * @param buf ByteBuffer to return. + */ + private void putbackBuffer(ByteBuffer buf) { + if (buf.capacity() != bufSize || (reservoirEnabled ^ buf.isDirect())) { + LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored"); + return; + } + buffers.offer(buf); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java index 0b97abb..e8bd322 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java @@ -23,6 +23,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -39,18 +41,17 @@ import org.slf4j.LoggerFactory; public class ByteBufferListOutputStream extends ByteBufferOutputStream { private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class); - private ByteBufferPool pool; + private ByteBuffAllocator allocator; // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If // it is not available will make a new one our own and keep writing to that. We keep track of all // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure // to return back all of them to pool - protected List<ByteBuffer> allBufs = new ArrayList<>(); - protected List<ByteBuffer> bufsFromPool = new ArrayList<>(); + protected List<SingleByteBuff> allBufs = new ArrayList<>(); private boolean lastBufFlipped = false;// Indicate whether the curBuf/lastBuf is flipped already - public ByteBufferListOutputStream(ByteBufferPool pool) { - this.pool = pool; + public ByteBufferListOutputStream(ByteBuffAllocator allocator) { + this.allocator = allocator; allocateNewBuffer(); } @@ -58,18 +59,10 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { if (this.curBuf != null) { this.curBuf.flip();// On the current buf set limit = pos and pos = 0. } - // Get an initial BB to work with from the pool - this.curBuf = this.pool.getBuffer(); - if (this.curBuf == null) { - // No free BB at this moment. Make a new one. The pool returns off heap BBs. Don't make off - // heap BB on demand. It is difficult to account for all such and so proper sizing of Max - // direct heap size. See HBASE-15525 also for more details. - // Make BB with same size of pool's buffer size. - this.curBuf = ByteBuffer.allocate(this.pool.getBufferSize()); - } else { - this.bufsFromPool.add(this.curBuf); - } - this.allBufs.add(this.curBuf); + // Get an initial ByteBuffer from the allocator. + SingleByteBuff sbb = allocator.allocateOneBuffer(); + this.curBuf = sbb.nioByteBuffers()[0]; + this.allBufs.add(sbb); } @Override @@ -118,11 +111,8 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { LOG.debug(e.toString(), e); } // Return back all the BBs to pool - if (this.bufsFromPool != null) { - for (int i = 0; i < this.bufsFromPool.size(); i++) { - this.pool.putbackBuffer(this.bufsFromPool.get(i)); - } - this.bufsFromPool = null; + for (ByteBuff buf : this.allBufs) { + buf.release(); } this.allBufs = null; this.curBuf = null; @@ -144,7 +134,11 @@ public class ByteBufferListOutputStream extends ByteBufferOutputStream { // All the other BBs are already flipped while moving to the new BB. curBuf.flip(); } - return this.allBufs; + List<ByteBuffer> bbs = new ArrayList<>(this.allBufs.size()); + for (SingleByteBuff bb : this.allBufs) { + bbs.add(bb.nioByteBuffers()[0]); + } + return bbs; } @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java deleted file mode 100644 index caca20b..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferPool.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import java.nio.ByteBuffer; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - -/** - * Like Hadoops' ByteBufferPool only you do not specify desired size when getting a ByteBuffer. This - * pool keeps an upper bound on the count of ByteBuffers in the pool and a fixed size of ByteBuffer - * that it will create. When requested, if a free ByteBuffer is already present, it will return - * that. And when no free ByteBuffer available and we are below the max count, it will create a new - * one and return that. - * - * <p> - * Note: This pool returns off heap ByteBuffers by default. If on heap ByteBuffers to be pooled, - * pass 'directByteBuffer' as false while construction of the pool. - * <p> - * This class is thread safe. - * - * @see ByteBufferListOutputStream - */ [email protected] -public class ByteBufferPool { - private static final Logger LOG = LoggerFactory.getLogger(ByteBufferPool.class); - // TODO better config names? - // hbase.ipc.server.reservoir.initial.max -> hbase.ipc.server.reservoir.max.buffer.count - // hbase.ipc.server.reservoir.initial.buffer.size -> hbase.ipc.server.reservoir.buffer.size - public static final String MAX_POOL_SIZE_KEY = "hbase.ipc.server.reservoir.initial.max"; - public static final String BUFFER_SIZE_KEY = "hbase.ipc.server.reservoir.initial.buffer.size"; - public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;// 64 KB. Making it same as the chunk size - // what we will write/read to/from the - // socket channel. - private final Queue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>(); - - private final int bufferSize; - private final int maxPoolSize; - private AtomicInteger count; // Count of the BBs created already for this pool. - private final boolean directByteBuffer; //Whether this pool should return DirectByteBuffers - private boolean maxPoolSizeInfoLevelLogged = false; - - /** - * @param bufferSize Size of each buffer created by this pool. - * @param maxPoolSize Max number of buffers to keep in this pool. - */ - public ByteBufferPool(int bufferSize, int maxPoolSize) { - this(bufferSize, maxPoolSize, true); - } - - /** - * @param bufferSize Size of each buffer created by this pool. - * @param maxPoolSize Max number of buffers to keep in this pool. - * @param directByteBuffer Whether to create direct ByteBuffer or on heap ByteBuffer. - */ - public ByteBufferPool(int bufferSize, int maxPoolSize, boolean directByteBuffer) { - this.bufferSize = bufferSize; - this.maxPoolSize = maxPoolSize; - this.directByteBuffer = directByteBuffer; - // TODO can add initialPoolSize config also and make those many BBs ready for use. - LOG.info("Created with bufferSize={} and maxPoolSize={}", - org.apache.hadoop.util.StringUtils.byteDesc(bufferSize), - org.apache.hadoop.util.StringUtils.byteDesc(maxPoolSize)); - this.count = new AtomicInteger(0); - } - - /** - * @return One free ByteBuffer from the pool. If no free ByteBuffer and we have not reached the - * maximum pool size, it will create a new one and return. In case of max pool size also - * reached, will return null. When pool returned a ByteBuffer, make sure to return it back - * to pool after use. - * @see #putbackBuffer(ByteBuffer) - */ - public ByteBuffer getBuffer() { - ByteBuffer bb = buffers.poll(); - if (bb != null) { - // Clear sets limit == capacity. Position == 0. - bb.clear(); - return bb; - } - while (true) { - int c = this.count.intValue(); - if (c >= this.maxPoolSize) { - if (maxPoolSizeInfoLevelLogged) { - if (LOG.isDebugEnabled()) { - LOG.debug("Pool already reached its max capacity : " + this.maxPoolSize - + " and no free buffers now. Consider increasing the value for '" - + MAX_POOL_SIZE_KEY + "' ?"); - } - } else { - LOG.info("Pool already reached its max capacity : " + this.maxPoolSize - + " and no free buffers now. Consider increasing the value for '" + MAX_POOL_SIZE_KEY - + "' ?"); - maxPoolSizeInfoLevelLogged = true; - } - return null; - } - if (!this.count.compareAndSet(c, c + 1)) { - continue; - } - if (LOG.isTraceEnabled()) { - LOG.trace("Creating a new offheap ByteBuffer of size: " + this.bufferSize); - } - return this.directByteBuffer ? ByteBuffer.allocateDirect(this.bufferSize) - : ByteBuffer.allocate(this.bufferSize); - } - } - - /** - * Return back a ByteBuffer after its use. Do not try to return put back a ByteBuffer, not - * obtained from this pool. - * @param buf ByteBuffer to return. - */ - public void putbackBuffer(ByteBuffer buf) { - if (buf.capacity() != this.bufferSize || (this.directByteBuffer ^ buf.isDirect())) { - LOG.warn("Trying to put a buffer, not created by this pool! Will be just ignored"); - return; - } - buffers.offer(buf); - } - - public int getBufferSize() { - return this.bufferSize; - } - - /** - * @return Number of free buffers - */ - @VisibleForTesting - public int getQueueSize() { - return buffers.size(); - } -} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java index 8bc7974..d7ab009 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java @@ -98,7 +98,7 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { currentBuffer.skip(current.tagsLength); } if (includesMvcc()) { - current.memstoreTS = ByteBuff.readVLong(currentBuffer); + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); } else { current.memstoreTS = 0; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java index 01f0a9d..ab93d19 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java @@ -477,7 +477,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { decodeTags(); } if (includesMvcc()) { - current.memstoreTS = ByteBuff.readVLong(currentBuffer); + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); } else { current.memstoreTS = 0; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java index baa1856..aa9a436 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java @@ -501,7 +501,7 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { decodeTags(); } if (includesMvcc()) { - current.memstoreTS = ByteBuff.readVLong(currentBuffer); + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); } else { current.memstoreTS = 0; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java index 63da7e7..176bea3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java @@ -213,7 +213,7 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { decodeTags(); } if (includesMvcc()) { - current.memstoreTS = ByteBuff.readVLong(currentBuffer); + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); } else { current.memstoreTS = 0; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java index 14d847c..9c0532e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/RowIndexSeekerV1.java @@ -282,7 +282,7 @@ public class RowIndexSeekerV1 extends AbstractEncodedSeeker { decodeTags(); } if (includesMvcc()) { - current.memstoreTS = ByteBuff.readVLong(currentBuffer); + current.memstoreTS = ByteBufferUtils.readVLong(currentBuffer); } else { current.memstoreTS = 0; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java index 68cf56e..1ee3607 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -24,22 +24,81 @@ import java.nio.channels.ReadableByteChannel; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ObjectIntPair; -import org.apache.hadoop.io.WritableUtils; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted; +import org.apache.hbase.thirdparty.io.netty.util.internal.ObjectUtil; + + /** - * An abstract class that abstracts out as to how the byte buffers are used, - * either single or multiple. We have this interface because the java's ByteBuffers - * cannot be sub-classed. This class provides APIs similar to the ones provided - * in java's nio ByteBuffers and allows you to do positional reads/writes and relative - * reads and writes on the underlying BB. In addition to it, we have some additional APIs which - * helps us in the read path. + * An abstract class that abstracts out as to how the byte buffers are used, either single or + * multiple. We have this interface because the java's ByteBuffers cannot be sub-classed. This class + * provides APIs similar to the ones provided in java's nio ByteBuffers and allows you to do + * positional reads/writes and relative reads and writes on the underlying BB. In addition to it, we + * have some additional APIs which helps us in the read path. <br/> + * The ByteBuff implement {@link ReferenceCounted} interface which mean need to maintains a + * {@link RefCnt} inside, if ensure that the ByteBuff won't be used any more, we must do a + * {@link ByteBuff#release()} to recycle its NIO ByteBuffers. when considering the + * {@link ByteBuff#duplicate()} or {@link ByteBuff#slice()}, releasing either the duplicated one or + * the original one will free its memory, because they share the same NIO ByteBuffers. when you want + * to retain the NIO ByteBuffers even if the origin one called {@link ByteBuff#release()}, you can + * do like this: + * + * <pre> + * ByteBuff original = ...; + * ByteBuff dup = original.duplicate(); + * dup.retain(); + * original.release(); + * // The NIO buffers can still be accessed unless you release the duplicated one + * dup.get(...); + * dup.release(); + * // Both the original and dup can not access the NIO buffers any more. + * </pre> */ @InterfaceAudience.Private -// TODO to have another name. This can easily get confused with netty's ByteBuf -public abstract class ByteBuff { +public abstract class ByteBuff implements ReferenceCounted { + private static final String REFERENCE_COUNT_NAME = "ReferenceCount"; private static final int NIO_BUFFER_LIMIT = 64 * 1024; // should not be more than 64KB. + protected RefCnt refCnt; + + /*************************** Methods for reference count **********************************/ + + protected void checkRefCount() { + ObjectUtil.checkPositive(refCnt(), REFERENCE_COUNT_NAME); + } + + public int refCnt() { + return refCnt.refCnt(); + } + + @Override + public boolean release() { + return refCnt.release(); + } + + @Override + public final ByteBuff retain(int increment) { + throw new UnsupportedOperationException(); + } + + @Override + public final boolean release(int increment) { + throw new UnsupportedOperationException(); + } + + @Override + public final ByteBuff touch() { + throw new UnsupportedOperationException(); + } + + @Override + public final ByteBuff touch(Object hint) { + throw new UnsupportedOperationException(); + } + + /******************************* Methods for ByteBuff **************************************/ + /** * @return this ByteBuff's current position */ @@ -491,78 +550,11 @@ public abstract class ByteBuff { return tmpLength; } - /** - * Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a - * {@link ByteBuff}. - */ - public static long readVLong(ByteBuff in) { - byte firstByte = in.get(); - int len = WritableUtils.decodeVIntSize(firstByte); - if (len == 1) { - return firstByte; - } - long i = 0; - for (int idx = 0; idx < len-1; idx++) { - byte b = in.get(); - i = i << 8; - i = i | (b & 0xFF); - } - return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i); - } - - /** - * Search sorted array "a" for byte "key". - * - * @param a Array to search. Entries must be sorted and unique. - * @param fromIndex First index inclusive of "a" to include in the search. - * @param toIndex Last index exclusive of "a" to include in the search. - * @param key The byte to search for. - * @return The index of key if found. If not found, return -(index + 1), where - * negative indicates "not found" and the "index + 1" handles the "-0" - * case. - */ - public static int unsignedBinarySearch(ByteBuff a, int fromIndex, int toIndex, byte key) { - int unsignedKey = key & 0xff; - int low = fromIndex; - int high = toIndex - 1; - - while (low <= high) { - int mid = low + ((high - low) >> 1); - int midVal = a.get(mid) & 0xff; - - if (midVal < unsignedKey) { - low = mid + 1; - } else if (midVal > unsignedKey) { - high = mid - 1; - } else { - return mid; // key found - } - } - return -(low + 1); // key not found. - } + public abstract ByteBuffer[] nioByteBuffers(); @Override public String toString() { return this.getClass().getSimpleName() + "[pos=" + position() + ", lim=" + limit() + ", cap= " + capacity() + "]"; } - - public static String toStringBinary(final ByteBuff b, int off, int len) { - StringBuilder result = new StringBuilder(); - // Just in case we are passed a 'len' that is > buffer length... - if (off >= b.capacity()) - return result.toString(); - if (off + len > b.capacity()) - len = b.capacity() - off; - for (int i = off; i < off + len; ++i) { - int ch = b.get(i) & 0xFF; - if ((ch >= '0' && ch <= '9') || (ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z') - || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) { - result.append((char) ch); - } else { - result.append(String.format("\\x%02X", ch)); - } - } - return result.toString(); - } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java index 97f5141..e9eadc7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.nio; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE; + import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; @@ -24,13 +26,12 @@ import java.nio.ByteBuffer; import java.nio.InvalidMarkException; import java.nio.channels.ReadableByteChannel; +import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - /** * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, @@ -53,6 +54,15 @@ public class MultiByteBuff extends ByteBuff { private final int[] itemBeginPos; public MultiByteBuff(ByteBuffer... items) { + this(NONE, items); + } + + public MultiByteBuff(Recycler recycler, ByteBuffer... items) { + this(new RefCnt(recycler), items); + } + + private MultiByteBuff(RefCnt refCnt, ByteBuffer... items) { + this.refCnt = refCnt; assert items != null; assert items.length > 0; this.items = items; @@ -75,8 +85,9 @@ public class MultiByteBuff extends ByteBuff { this.limitedItemIndex = this.items.length - 1; } - private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex, - int curItemIndex, int markedIndex) { + private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int limit, + int limitedIndex, int curItemIndex, int markedIndex) { + this.refCnt = refCnt; this.items = items; this.curItemIndex = curItemIndex; this.curItem = this.items[this.curItemIndex]; @@ -117,6 +128,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public int capacity() { + checkRefCount(); int c = 0; for (ByteBuffer item : this.items) { c += item.capacity(); @@ -131,12 +143,14 @@ public class MultiByteBuff extends ByteBuff { */ @Override public byte get(int index) { + checkRefCount(); int itemIndex = getItemIndex(index); return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); } @Override public byte getByteAfterPosition(int offset) { + checkRefCount(); // Mostly the index specified will land within this current item. Short circuit for that int index = offset + this.position(); int itemIndex = getItemIndexFromCurItemIndex(index); @@ -179,6 +193,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public int getInt(int index) { + checkRefCount(); // Mostly the index specified will land within this current item. Short circuit for that int itemIndex; if (this.itemBeginPos[this.curItemIndex] <= index @@ -192,6 +207,7 @@ public class MultiByteBuff extends ByteBuff { @Override public int getIntAfterPosition(int offset) { + checkRefCount(); // Mostly the index specified will land within this current item. Short circuit for that int index = offset + this.position(); int itemIndex; @@ -210,6 +226,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public short getShort(int index) { + checkRefCount(); // Mostly the index specified will land within this current item. Short circuit for that int itemIndex; if (this.itemBeginPos[this.curItemIndex] <= index @@ -238,6 +255,7 @@ public class MultiByteBuff extends ByteBuff { @Override public short getShortAfterPosition(int offset) { + checkRefCount(); // Mostly the index specified will land within this current item. Short circuit for that int index = offset + this.position(); int itemIndex; @@ -319,6 +337,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public long getLong(int index) { + checkRefCount(); // Mostly the index specified will land within this current item. Short circuit for that int itemIndex; if (this.itemBeginPos[this.curItemIndex] <= index @@ -332,6 +351,7 @@ public class MultiByteBuff extends ByteBuff { @Override public long getLongAfterPosition(int offset) { + checkRefCount(); // Mostly the index specified will land within this current item. Short circuit for that int index = offset + this.position(); int itemIndex; @@ -348,6 +368,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public int position() { + checkRefCount(); return itemBeginPos[this.curItemIndex] + this.curItem.position(); } @@ -358,6 +379,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff position(int position) { + checkRefCount(); // Short circuit for positioning within the cur item. Mostly that is the case. if (this.itemBeginPos[this.curItemIndex] <= position && this.itemBeginPos[this.curItemIndex + 1] > position) { @@ -385,6 +407,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff rewind() { + checkRefCount(); for (int i = 0; i < this.items.length; i++) { this.items[i].rewind(); } @@ -400,6 +423,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff mark() { + checkRefCount(); this.markedItemIndex = this.curItemIndex; this.curItem.mark(); return this; @@ -412,6 +436,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff reset() { + checkRefCount(); // when the buffer is moved to the next one.. the reset should happen on the previous marked // item and the new one should be taken as the base if (this.markedItemIndex < 0) throw new InvalidMarkException(); @@ -433,6 +458,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public int remaining() { + checkRefCount(); int remain = 0; for (int i = curItemIndex; i < items.length; i++) { remain += items[i].remaining(); @@ -446,6 +472,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public final boolean hasRemaining() { + checkRefCount(); return this.curItem.hasRemaining() || (this.curItemIndex < this.limitedItemIndex && this.items[this.curItemIndex + 1].hasRemaining()); } @@ -457,6 +484,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public byte get() { + checkRefCount(); if (this.curItem.remaining() == 0) { if (items.length - 1 == this.curItemIndex) { // means cur item is the last one and we wont be able to read a long. Throw exception @@ -476,6 +504,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public short getShort() { + checkRefCount(); int remaining = this.curItem.remaining(); if (remaining >= Bytes.SIZEOF_SHORT) { return this.curItem.getShort(); @@ -494,6 +523,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public int getInt() { + checkRefCount(); int remaining = this.curItem.remaining(); if (remaining >= Bytes.SIZEOF_INT) { return this.curItem.getInt(); @@ -514,6 +544,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public long getLong() { + checkRefCount(); int remaining = this.curItem.remaining(); if (remaining >= Bytes.SIZEOF_LONG) { return this.curItem.getLong(); @@ -545,6 +576,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public void get(byte[] dst, int offset, int length) { + checkRefCount(); while (length > 0) { int toRead = Math.min(length, this.curItem.remaining()); ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset, @@ -560,6 +592,7 @@ public class MultiByteBuff extends ByteBuff { @Override public void get(int sourceOffset, byte[] dst, int offset, int length) { + checkRefCount(); int itemIndex = getItemIndex(sourceOffset); ByteBuffer item = this.items[itemIndex]; sourceOffset = sourceOffset - this.itemBeginPos[itemIndex]; @@ -583,6 +616,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff limit(int limit) { + checkRefCount(); this.limit = limit; // Normally the limit will try to limit within the last BB item int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex]; @@ -622,29 +656,30 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff slice() { + checkRefCount(); ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1]; for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) { copy[j] = this.items[i].slice(); } - return new MultiByteBuff(copy); + return new MultiByteBuff(refCnt, copy); } /** - * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark - * of the new MBB will be independent than that of the original MBB. - * The content of the new MBB will start at this MBB's current position - * The position, limit and mark of the new MBB would be identical to this MBB in terms of - * values. - * @return a sliced MBB + * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark of the + * new MBB will be independent than that of the original MBB. The content of the new MBB will + * start at this MBB's current position The position, limit and mark of the new MBB would be + * identical to this MBB in terms of values. + * @return a duplicated MBB */ @Override public MultiByteBuff duplicate() { + checkRefCount(); ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length]; for (int i = 0; i < this.items.length; i++) { itemsCopy[i] = items[i].duplicate(); } - return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex, - this.curItemIndex, this.markedItemIndex); + return new MultiByteBuff(refCnt, itemsCopy, this.itemBeginPos, this.limit, + this.limitedItemIndex, this.curItemIndex, this.markedItemIndex); } /** @@ -654,6 +689,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff put(byte b) { + checkRefCount(); if (this.curItem.remaining() == 0) { if (this.curItemIndex == this.items.length - 1) { throw new BufferOverflowException(); @@ -673,6 +709,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff put(int index, byte b) { + checkRefCount(); int itemIndex = getItemIndex(limit); ByteBuffer item = items[itemIndex]; item.put(index - itemBeginPos[itemIndex], b); @@ -688,6 +725,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) { + checkRefCount(); int destItemIndex = getItemIndex(offset); int srcItemIndex = getItemIndex(srcOffset); ByteBuffer destItem = this.items[destItemIndex]; @@ -723,7 +761,7 @@ public class MultiByteBuff extends ByteBuff { } private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) { - return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer() + return (buf instanceof SingleByteBuff) ? buf.nioByteBuffers()[0] : ((MultiByteBuff) buf).items[index]; } @@ -734,6 +772,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff putInt(int val) { + checkRefCount(); if (this.curItem.remaining() >= Bytes.SIZEOF_INT) { this.curItem.putInt(val); return this; @@ -784,6 +823,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff put(byte[] src, int offset, int length) { + checkRefCount(); if (this.curItem.remaining() >= length) { ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length); return this; @@ -803,6 +843,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff putLong(long val) { + checkRefCount(); if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) { this.curItem.putLong(val); return this; @@ -860,6 +901,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff skip(int length) { + checkRefCount(); // Get available bytes from this item and remaining from next int jump = 0; while (true) { @@ -882,6 +924,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public MultiByteBuff moveBack(int length) { + checkRefCount(); while (length != 0) { if (length > curItem.position()) { length -= curItem.position(); @@ -909,6 +952,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public ByteBuffer asSubByteBuffer(int length) { + checkRefCount(); if (this.curItem.remaining() >= length) { return this.curItem; } @@ -918,8 +962,8 @@ public class MultiByteBuff extends ByteBuff { ByteBuffer locCurItem = curItem; while (length > 0) { int toRead = Math.min(length, locCurItem.remaining()); - ByteBufferUtils - .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead); + ByteBufferUtils.copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, + toRead); length -= toRead; if (length == 0) break; locCurItemIndex++; @@ -945,6 +989,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) { + checkRefCount(); if (this.itemBeginPos[this.curItemIndex] <= offset) { int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex]; if (this.curItem.limit() - relOffsetInCurItem >= length) { @@ -988,6 +1033,7 @@ public class MultiByteBuff extends ByteBuff { @Override public void get(ByteBuffer out, int sourceOffset, int length) { + checkRefCount(); // Not used from real read path actually. So not going with // optimization for (int i = 0; i < length; ++i) { @@ -1007,6 +1053,7 @@ public class MultiByteBuff extends ByteBuff { */ @Override public byte[] toBytes(int offset, int length) { + checkRefCount(); byte[] output = new byte[length]; this.get(offset, output, 0, length); return output; @@ -1014,6 +1061,7 @@ public class MultiByteBuff extends ByteBuff { @Override public int read(ReadableByteChannel channel) throws IOException { + checkRefCount(); int total = 0; while (true) { // Read max possible into the current BB @@ -1034,13 +1082,19 @@ public class MultiByteBuff extends ByteBuff { } @Override + public ByteBuffer[] nioByteBuffers() { + checkRefCount(); + return this.items; + } + + @Override public boolean equals(Object obj) { if (!(obj instanceof MultiByteBuff)) return false; if (this == obj) return true; MultiByteBuff that = (MultiByteBuff) obj; if (this.capacity() != that.capacity()) return false; if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(), - that.limit()) == 0) { + that.limit()) == 0) { return true; } return false; @@ -1055,11 +1109,9 @@ public class MultiByteBuff extends ByteBuff { return hash; } - /** - * @return the ByteBuffers which this wraps. - */ - @VisibleForTesting - public ByteBuffer[] getEnclosingByteBuffers() { - return this.items; + @Override + public MultiByteBuff retain() { + refCnt.retain(); + return this; } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java new file mode 100644 index 0000000..80172b2 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.nio; + +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.util.AbstractReferenceCounted; +import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted; + +/** + * Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the + * reference count become 0, it'll call {@link Recycler#free()} once. + */ [email protected] +class RefCnt extends AbstractReferenceCounted { + + private Recycler recycler = ByteBuffAllocator.NONE; + + RefCnt(Recycler recycler) { + this.recycler = recycler; + } + + @Override + protected final void deallocate() { + this.recycler.free(); + } + + @Override + public final ReferenceCounted touch(Object hint) { + throw new UnsupportedOperationException(); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java index 6d64d7b..7205251 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java @@ -17,22 +17,24 @@ */ package org.apache.hadoop.hbase.nio; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.NONE; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import org.apache.hadoop.hbase.io.ByteBuffAllocator.Recycler; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.ObjectIntPair; import org.apache.hadoop.hbase.util.UnsafeAccess; import org.apache.hadoop.hbase.util.UnsafeAvailChecker; import org.apache.yetus.audience.InterfaceAudience; -import sun.nio.ch.DirectBuffer; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import sun.nio.ch.DirectBuffer; /** - * An implementation of ByteBuff where a single BB backs the BBI. This just acts - * as a wrapper over a normal BB - offheap or onheap + * An implementation of ByteBuff where a single BB backs the BBI. This just acts as a wrapper over a + * normal BB - offheap or onheap */ @InterfaceAudience.Private public class SingleByteBuff extends ByteBuff { @@ -48,6 +50,15 @@ public class SingleByteBuff extends ByteBuff { private Object unsafeRef = null; public SingleByteBuff(ByteBuffer buf) { + this(NONE, buf); + } + + public SingleByteBuff(Recycler recycler, ByteBuffer buf) { + this(new RefCnt(recycler), buf); + } + + private SingleByteBuff(RefCnt refCnt, ByteBuffer buf) { + this.refCnt = refCnt; this.buf = buf; if (buf.hasArray()) { this.unsafeOffset = UnsafeAccess.BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset(); @@ -59,63 +70,74 @@ public class SingleByteBuff extends ByteBuff { @Override public int position() { + checkRefCount(); return this.buf.position(); } @Override public SingleByteBuff position(int position) { + checkRefCount(); this.buf.position(position); return this; } @Override public SingleByteBuff skip(int len) { + checkRefCount(); this.buf.position(this.buf.position() + len); return this; } @Override public SingleByteBuff moveBack(int len) { + checkRefCount(); this.buf.position(this.buf.position() - len); return this; } @Override public int capacity() { + checkRefCount(); return this.buf.capacity(); } @Override public int limit() { + checkRefCount(); return this.buf.limit(); } @Override public SingleByteBuff limit(int limit) { + checkRefCount(); this.buf.limit(limit); return this; } @Override public SingleByteBuff rewind() { + checkRefCount(); this.buf.rewind(); return this; } @Override public SingleByteBuff mark() { + checkRefCount(); this.buf.mark(); return this; } @Override public ByteBuffer asSubByteBuffer(int length) { + checkRefCount(); // Just return the single BB that is available return this.buf; } @Override public void asSubByteBuffer(int offset, int length, ObjectIntPair<ByteBuffer> pair) { + checkRefCount(); // Just return the single BB that is available pair.setFirst(this.buf); pair.setSecond(offset); @@ -123,37 +145,44 @@ public class SingleByteBuff extends ByteBuff { @Override public int remaining() { + checkRefCount(); return this.buf.remaining(); } @Override public boolean hasRemaining() { + checkRefCount(); return buf.hasRemaining(); } @Override public SingleByteBuff reset() { + checkRefCount(); this.buf.reset(); return this; } @Override public SingleByteBuff slice() { - return new SingleByteBuff(this.buf.slice()); + checkRefCount(); + return new SingleByteBuff(this.refCnt, this.buf.slice()); } @Override public SingleByteBuff duplicate() { - return new SingleByteBuff(this.buf.duplicate()); + checkRefCount(); + return new SingleByteBuff(this.refCnt, this.buf.duplicate()); } @Override public byte get() { + checkRefCount(); return buf.get(); } @Override public byte get(int index) { + checkRefCount(); if (UNSAFE_AVAIL) { return UnsafeAccess.toByte(this.unsafeRef, this.unsafeOffset + index); } @@ -162,29 +191,34 @@ public class SingleByteBuff extends ByteBuff { @Override public byte getByteAfterPosition(int offset) { + checkRefCount(); return get(this.buf.position() + offset); } @Override public SingleByteBuff put(byte b) { + checkRefCount(); this.buf.put(b); return this; } @Override public SingleByteBuff put(int index, byte b) { + checkRefCount(); buf.put(index, b); return this; } @Override public void get(byte[] dst, int offset, int length) { + checkRefCount(); ByteBufferUtils.copyFromBufferToArray(dst, buf, buf.position(), offset, length); buf.position(buf.position() + length); } @Override public void get(int sourceOffset, byte[] dst, int offset, int length) { + checkRefCount(); ByteBufferUtils.copyFromBufferToArray(dst, buf, sourceOffset, offset, length); } @@ -195,9 +229,10 @@ public class SingleByteBuff extends ByteBuff { @Override public SingleByteBuff put(int offset, ByteBuff src, int srcOffset, int length) { + checkRefCount(); if (src instanceof SingleByteBuff) { ByteBufferUtils.copyFromBufferToBuffer(((SingleByteBuff) src).buf, this.buf, srcOffset, - offset, length); + offset, length); } else { // TODO we can do some optimization here? Call to asSubByteBuffer might // create a copy. @@ -205,7 +240,7 @@ public class SingleByteBuff extends ByteBuff { src.asSubByteBuffer(srcOffset, length, pair); if (pair.getFirst() != null) { ByteBufferUtils.copyFromBufferToBuffer(pair.getFirst(), this.buf, pair.getSecond(), offset, - length); + length); } } return this; @@ -213,37 +248,44 @@ public class SingleByteBuff extends ByteBuff { @Override public SingleByteBuff put(byte[] src, int offset, int length) { + checkRefCount(); ByteBufferUtils.copyFromArrayToBuffer(this.buf, src, offset, length); return this; } @Override public SingleByteBuff put(byte[] src) { + checkRefCount(); return put(src, 0, src.length); } @Override public boolean hasArray() { + checkRefCount(); return this.buf.hasArray(); } @Override public byte[] array() { + checkRefCount(); return this.buf.array(); } @Override public int arrayOffset() { + checkRefCount(); return this.buf.arrayOffset(); } @Override public short getShort() { + checkRefCount(); return this.buf.getShort(); } @Override public short getShort(int index) { + checkRefCount(); if (UNSAFE_UNALIGNED) { return UnsafeAccess.toShort(unsafeRef, unsafeOffset + index); } @@ -252,22 +294,26 @@ public class SingleByteBuff extends ByteBuff { @Override public short getShortAfterPosition(int offset) { + checkRefCount(); return getShort(this.buf.position() + offset); } @Override public int getInt() { + checkRefCount(); return this.buf.getInt(); } @Override public SingleByteBuff putInt(int value) { + checkRefCount(); ByteBufferUtils.putInt(this.buf, value); return this; } @Override public int getInt(int index) { + checkRefCount(); if (UNSAFE_UNALIGNED) { return UnsafeAccess.toInt(unsafeRef, unsafeOffset + index); } @@ -276,22 +322,26 @@ public class SingleByteBuff extends ByteBuff { @Override public int getIntAfterPosition(int offset) { + checkRefCount(); return getInt(this.buf.position() + offset); } @Override public long getLong() { + checkRefCount(); return this.buf.getLong(); } @Override public SingleByteBuff putLong(long value) { + checkRefCount(); ByteBufferUtils.putLong(this.buf, value); return this; } @Override public long getLong(int index) { + checkRefCount(); if (UNSAFE_UNALIGNED) { return UnsafeAccess.toLong(unsafeRef, unsafeOffset + index); } @@ -300,11 +350,13 @@ public class SingleByteBuff extends ByteBuff { @Override public long getLongAfterPosition(int offset) { + checkRefCount(); return getLong(this.buf.position() + offset); } @Override public byte[] toBytes(int offset, int length) { + checkRefCount(); byte[] output = new byte[length]; ByteBufferUtils.copyFromBufferToArray(output, buf, offset, 0, length); return output; @@ -312,18 +364,28 @@ public class SingleByteBuff extends ByteBuff { @Override public void get(ByteBuffer out, int sourceOffset, int length) { + checkRefCount(); ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length); } @Override public int read(ReadableByteChannel channel) throws IOException { + checkRefCount(); return channelRead(channel, buf); } @Override + public ByteBuffer[] nioByteBuffers() { + checkRefCount(); + return new ByteBuffer[] { this.buf }; + } + + @Override public boolean equals(Object obj) { - if(!(obj instanceof SingleByteBuff)) return false; - return this.buf.equals(((SingleByteBuff)obj).buf); + if (!(obj instanceof SingleByteBuff)) { + return false; + } + return this.buf.equals(((SingleByteBuff) obj).buf); } @Override @@ -331,11 +393,9 @@ public class SingleByteBuff extends ByteBuff { return this.buf.hashCode(); } - /** - * @return the ByteBuffer which this wraps. - */ - @VisibleForTesting - public ByteBuffer getEnclosingByteBuffer() { - return this.buf; + @Override + public SingleByteBuff retain() { + refCnt.retain(); + return this; } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index 2e14b13..d023339 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -27,9 +27,9 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.MultiByteBuff; -import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.util.StringUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -311,10 +311,6 @@ public class ByteBufferArray { srcIndex += cnt; } assert srcIndex == len; - if (mbb.length > 1) { - return new MultiByteBuff(mbb); - } else { - return new SingleByteBuff(mbb[0]); - } + return ByteBuffAllocator.wrap(mbb); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 3ea0a5c..98bc88a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -30,6 +30,7 @@ import java.util.Arrays; import org.apache.hadoop.hbase.io.ByteBufferWriter; import org.apache.hadoop.hbase.io.util.StreamUtils; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.WritableUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -348,25 +349,39 @@ public final class ByteBufferUtils { } } - /** - * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a - * {@link ByteBuffer}. - */ - public static long readVLong(ByteBuffer in) { - byte firstByte = in.get(); + private interface ByteVisitor { + byte get(); + } + + private static long readVLong(ByteVisitor visitor) { + byte firstByte = visitor.get(); int len = WritableUtils.decodeVIntSize(firstByte); if (len == 1) { return firstByte; } long i = 0; - for (int idx = 0; idx < len-1; idx++) { - byte b = in.get(); + for (int idx = 0; idx < len - 1; idx++) { + byte b = visitor.get(); i = i << 8; i = i | (b & 0xFF); } return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i); } + /** + * Similar to {@link WritableUtils#readVLong(DataInput)} but reads from a {@link ByteBuffer}. + */ + public static long readVLong(ByteBuffer in) { + return readVLong(in::get); + } + + /** + * Similar to {@link WritableUtils#readVLong(java.io.DataInput)} but reads from a + * {@link ByteBuff}. + */ + public static long readVLong(ByteBuff in) { + return readVLong(in::get); + } /** * Put in buffer integer using 7 bit encoding. For each written byte: diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java new file mode 100644 index 0000000..0976c11 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBuffAllocator.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RPCTests.class, SmallTests.class }) +public class TestByteBuffAllocator { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestByteBuffAllocator.class); + + @Test + public void testAllocateByteBuffToReadInto() { + int maxBuffersInPool = 10; + int bufSize = 6 * 1024; + ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, bufSize / 6); + ByteBuff buff = alloc.allocate(10 * bufSize); + buff.release(); + // When the request size is less than 1/6th of the pool buffer size. We should use on demand + // created on heap Buffer + buff = alloc.allocate(200); + assertTrue(buff.hasArray()); + assertEquals(maxBuffersInPool, alloc.getQueueSize()); + buff.release(); + // When the request size is > 1/6th of the pool buffer size. + buff = alloc.allocate(1024); + assertFalse(buff.hasArray()); + assertEquals(maxBuffersInPool - 1, alloc.getQueueSize()); + buff.release();// ByteBuffDeallocaor#free should put back the BB to pool. + assertEquals(maxBuffersInPool, alloc.getQueueSize()); + // Request size> pool buffer size + buff = alloc.allocate(7 * 1024); + assertFalse(buff.hasArray()); + assertTrue(buff instanceof MultiByteBuff); + ByteBuffer[] bbs = buff.nioByteBuffers(); + assertEquals(2, bbs.length); + assertTrue(bbs[0].isDirect()); + assertTrue(bbs[1].isDirect()); + assertEquals(6 * 1024, bbs[0].limit()); + assertEquals(1024, bbs[1].limit()); + assertEquals(maxBuffersInPool - 2, alloc.getQueueSize()); + buff.release(); + assertEquals(maxBuffersInPool, alloc.getQueueSize()); + + buff = alloc.allocate(6 * 1024 + 200); + assertFalse(buff.hasArray()); + assertTrue(buff instanceof MultiByteBuff); + bbs = buff.nioByteBuffers(); + assertEquals(2, bbs.length); + assertTrue(bbs[0].isDirect()); + assertFalse(bbs[1].isDirect()); + assertEquals(6 * 1024, bbs[0].limit()); + assertEquals(200, bbs[1].limit()); + assertEquals(maxBuffersInPool - 1, alloc.getQueueSize()); + buff.release(); + assertEquals(maxBuffersInPool, alloc.getQueueSize()); + + alloc.allocate(bufSize * (maxBuffersInPool - 1)); + buff = alloc.allocate(20 * 1024); + assertFalse(buff.hasArray()); + assertTrue(buff instanceof MultiByteBuff); + bbs = buff.nioByteBuffers(); + assertEquals(2, bbs.length); + assertTrue(bbs[0].isDirect()); + assertFalse(bbs[1].isDirect()); + assertEquals(6 * 1024, bbs[0].limit()); + assertEquals(14 * 1024, bbs[1].limit()); + assertEquals(0, alloc.getQueueSize()); + buff.release(); + assertEquals(1, alloc.getQueueSize()); + alloc.allocateOneBuffer(); + + buff = alloc.allocate(7 * 1024); + assertTrue(buff.hasArray()); + assertTrue(buff instanceof SingleByteBuff); + assertEquals(7 * 1024, buff.nioByteBuffers()[0].limit()); + buff.release(); + } + + @Test + public void testNegativeAllocatedSize() { + int maxBuffersInPool = 10; + ByteBuffAllocator allocator = + new ByteBuffAllocator(true, maxBuffersInPool, 6 * 1024, 1024); + try { + allocator.allocate(-1); + fail("Should throw exception when size < 0"); + } catch (IllegalArgumentException e) { + // expected exception + } + ByteBuff bb = allocator.allocate(0); + bb.release(); + } + + @Test + public void testAllocateOneBuffer() { + // Allocate from on-heap + ByteBuffAllocator allocator = ByteBuffAllocator.createOnHeap(); + ByteBuff buf = allocator.allocateOneBuffer(); + assertTrue(buf.hasArray()); + assertEquals(ByteBuffAllocator.DEFAULT_BUFFER_SIZE, buf.remaining()); + buf.release(); + + // Allocate from off-heap + int bufSize = 10; + allocator = new ByteBuffAllocator(true, 1, 10, 3); + buf = allocator.allocateOneBuffer(); + assertFalse(buf.hasArray()); + assertEquals(buf.remaining(), bufSize); + // The another one will be allocated from on-heap because the pool has only one ByteBuffer, + // and still not be cleaned. + ByteBuff buf2 = allocator.allocateOneBuffer(); + assertTrue(buf2.hasArray()); + assertEquals(buf2.remaining(), bufSize); + // free the first one + buf.release(); + // The next one will be off-heap again. + buf = allocator.allocateOneBuffer(); + assertFalse(buf.hasArray()); + assertEquals(buf.remaining(), bufSize); + buf.release(); + } + + @Test + public void testReferenceCount() { + int bufSize = 64; + ByteBuffAllocator alloc = new ByteBuffAllocator(true, 2, bufSize, 3); + ByteBuff buf1 = alloc.allocate(bufSize * 2); + assertFalse(buf1.hasArray()); + // The next one will be allocated from heap + ByteBuff buf2 = alloc.allocateOneBuffer(); + assertTrue(buf2.hasArray()); + + // duplicate the buf2, if the dup released, buf2 will also be released (SingleByteBuffer) + ByteBuff dup2 = buf2.duplicate(); + dup2.release(); + assertEquals(0, buf2.refCnt()); + assertEquals(0, dup2.refCnt()); + assertEquals(0, alloc.getQueueSize()); + assertException(dup2::position); + assertException(buf2::position); + + // duplicate the buf1, if the dup1 released, buf1 will also be released (MultipleByteBuffer) + ByteBuff dup1 = buf1.duplicate(); + dup1.release(); + assertEquals(0, buf1.refCnt()); + assertEquals(0, dup1.refCnt()); + assertEquals(2, alloc.getQueueSize()); + assertException(dup1::position); + assertException(buf1::position); + + // slice the buf3, if the slice3 released, buf3 will also be released (SingleByteBuffer) + ByteBuff buf3 = alloc.allocateOneBuffer(); + assertFalse(buf3.hasArray()); + ByteBuff slice3 = buf3.slice(); + slice3.release(); + assertEquals(0, buf3.refCnt()); + assertEquals(0, slice3.refCnt()); + assertEquals(2, alloc.getQueueSize()); + + // slice the buf4, if the slice4 released, buf4 will also be released (MultipleByteBuffer) + ByteBuff buf4 = alloc.allocate(bufSize * 2); + assertFalse(buf4.hasArray()); + ByteBuff slice4 = buf4.slice(); + slice4.release(); + assertEquals(0, buf4.refCnt()); + assertEquals(0, slice4.refCnt()); + assertEquals(2, alloc.getQueueSize()); + + // Test multiple reference for the same ByteBuff (SingleByteBuff) + ByteBuff buf5 = alloc.allocateOneBuffer(); + ByteBuff slice5 = buf5.duplicate().duplicate().duplicate().slice().slice(); + slice5.release(); + assertEquals(0, buf5.refCnt()); + assertEquals(0, slice5.refCnt()); + assertEquals(2, alloc.getQueueSize()); + assertException(slice5::position); + assertException(buf5::position); + + // Test multiple reference for the same ByteBuff (SingleByteBuff) + ByteBuff buf6 = alloc.allocate(bufSize >> 2); + ByteBuff slice6 = buf6.duplicate().duplicate().duplicate().slice().slice(); + slice6.release(); + assertEquals(0, buf6.refCnt()); + assertEquals(0, slice6.refCnt()); + assertEquals(2, alloc.getQueueSize()); + + // Test retain the parent SingleByteBuff (duplicate) + ByteBuff parent = alloc.allocateOneBuffer(); + ByteBuff child = parent.duplicate(); + child.retain(); + parent.release(); + assertEquals(1, child.refCnt()); + assertEquals(1, parent.refCnt()); + assertEquals(1, alloc.getQueueSize()); + parent.release(); + assertEquals(0, child.refCnt()); + assertEquals(0, parent.refCnt()); + assertEquals(2, alloc.getQueueSize()); + + // Test retain parent MultiByteBuff (duplicate) + parent = alloc.allocate(bufSize << 1); + child = parent.duplicate(); + child.retain(); + parent.release(); + assertEquals(1, child.refCnt()); + assertEquals(1, parent.refCnt()); + assertEquals(0, alloc.getQueueSize()); + parent.release(); + assertEquals(0, child.refCnt()); + assertEquals(0, parent.refCnt()); + assertEquals(2, alloc.getQueueSize()); + + // Test retain the parent SingleByteBuff (slice) + parent = alloc.allocateOneBuffer(); + child = parent.slice(); + child.retain(); + parent.release(); + assertEquals(1, child.refCnt()); + assertEquals(1, parent.refCnt()); + assertEquals(1, alloc.getQueueSize()); + parent.release(); + assertEquals(0, child.refCnt()); + assertEquals(0, parent.refCnt()); + assertEquals(2, alloc.getQueueSize()); + + // Test retain parent MultiByteBuff (slice) + parent = alloc.allocate(bufSize << 1); + child = parent.slice(); + child.retain(); + parent.release(); + assertEquals(1, child.refCnt()); + assertEquals(1, parent.refCnt()); + assertEquals(0, alloc.getQueueSize()); + parent.release(); + assertEquals(0, child.refCnt()); + assertEquals(0, parent.refCnt()); + assertEquals(2, alloc.getQueueSize()); + } + + @Test + public void testReverseRef() { + int bufSize = 64; + ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3); + ByteBuff buf1 = alloc.allocate(bufSize); + ByteBuff dup1 = buf1.duplicate(); + assertEquals(1, buf1.refCnt()); + assertEquals(1, dup1.refCnt()); + buf1.release(); + assertEquals(0, buf1.refCnt()); + assertEquals(0, dup1.refCnt()); + assertEquals(1, alloc.getQueueSize()); + assertException(buf1::position); + assertException(dup1::position); + } + + @Test + public void testByteBuffUnsupportedMethods() { + int bufSize = 64; + ByteBuffAllocator alloc = new ByteBuffAllocator(true, 1, bufSize, 3); + ByteBuff buf = alloc.allocate(bufSize); + assertException(() -> buf.retain(2)); + assertException(() -> buf.release(2)); + assertException(() -> buf.touch()); + assertException(() -> buf.touch(new Object())); + } + + private void assertException(Runnable r) { + try { + r.run(); + fail(); + } catch (Exception e) { + // expected exception. + } + } +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java index 2f7a869..3ac7a75 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferListOutputStream.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.nio.ByteBuffer; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ByteBufferUtils; @@ -40,29 +41,30 @@ public class TestByteBufferListOutputStream { @Test public void testWrites() throws Exception { - ByteBufferPool pool = new ByteBufferPool(10, 3); - ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool); + ByteBuffAllocator alloc = new ByteBuffAllocator(true, 3, 10, 10 / 6); + ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(alloc); bbos.write(2);// Write a byte bbos.writeInt(100);// Write an int byte[] b = Bytes.toBytes("row123");// 6 bytes bbos.write(b); + assertEquals(2, bbos.allBufs.size()); // Just use the 3rd BB from pool so that pabos, on request, wont get one - ByteBuffer bb1 = pool.getBuffer(); + ByteBuff bb1 = alloc.allocateOneBuffer(); ByteBuffer bb = ByteBuffer.wrap(Bytes.toBytes("row123_cf1_q1"));// 13 bytes bbos.write(bb, 0, bb.capacity()); - pool.putbackBuffer(bb1); + bb1.release(); bbos.writeInt(123); bbos.writeInt(124); - assertEquals(0, pool.getQueueSize()); + assertEquals(0, alloc.getQueueSize()); List<ByteBuffer> allBufs = bbos.getByteBuffers(); assertEquals(4, allBufs.size()); - assertEquals(3, bbos.bufsFromPool.size()); + assertEquals(4, bbos.allBufs.size()); ByteBuffer b1 = allBufs.get(0); assertEquals(10, b1.remaining()); assertEquals(2, b1.get()); assertEquals(100, b1.getInt()); byte[] bActual = new byte[b.length]; - b1.get(bActual, 0, 5);//5 bytes in 1st BB + b1.get(bActual, 0, 5);// 5 bytes in 1st BB ByteBuffer b2 = allBufs.get(1); assertEquals(10, b2.remaining()); b2.get(bActual, 5, 1);// Remaining 1 byte in 2nd BB @@ -78,6 +80,6 @@ public class TestByteBufferListOutputStream { assertEquals(4, b4.remaining()); assertEquals(124, b4.getInt()); bbos.releaseResources(); - assertEquals(3, pool.getQueueSize()); + assertEquals(3, alloc.getQueueSize()); } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java deleted file mode 100644 index 44d2f45..0000000 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferPool.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import static org.junit.Assert.assertEquals; - -import java.nio.ByteBuffer; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.testclassification.IOTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ IOTests.class, SmallTests.class }) -public class TestByteBufferPool { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestByteBufferPool.class); - - @Test - public void testOffheapBBPool() throws Exception { - boolean directByteBuffer = true; - testBBPool(10, 100, directByteBuffer); - } - - @Test - public void testOnheapBBPool() throws Exception { - boolean directByteBuffer = false; - testBBPool(10, 100, directByteBuffer); - } - - private void testBBPool(int maxPoolSize, int bufferSize, boolean directByteBuffer) { - ByteBufferPool pool = new ByteBufferPool(bufferSize, maxPoolSize, directByteBuffer); - for (int i = 0; i < maxPoolSize; i++) { - ByteBuffer buffer = pool.getBuffer(); - assertEquals(0, buffer.position()); - assertEquals(bufferSize, buffer.limit()); - assertEquals(directByteBuffer, buffer.isDirect()); - } - assertEquals(0, pool.getQueueSize()); - ByteBuffer bb = directByteBuffer ? ByteBuffer.allocate(bufferSize) - : ByteBuffer.allocateDirect(bufferSize); - pool.putbackBuffer(bb); - assertEquals(0, pool.getQueueSize()); - bb = directByteBuffer ? ByteBuffer.allocateDirect(bufferSize + 1) - : ByteBuffer.allocate(bufferSize + 1); - pool.putbackBuffer(bb); - assertEquals(0, pool.getQueueSize()); - } -} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java index 84cf7a4..fcfb77a 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java @@ -286,12 +286,12 @@ public class TestMultiByteBuff { multi.putInt(45); multi.position(1); multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG)); - MultiByteBuff sliced = multi.slice(); + ByteBuff sliced = multi.slice(); assertEquals(0, sliced.position()); assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit()); assertEquals(l1, sliced.getLong()); assertEquals(l2, sliced.getLong()); - MultiByteBuff dup = multi.duplicate(); + ByteBuff dup = multi.duplicate(); assertEquals(1, dup.position()); assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit()); assertEquals(l1, dup.getLong()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java index 0224dea..a842967 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/Cacheable.java @@ -69,11 +69,10 @@ public interface Cacheable extends HeapSize { /** * SHARED means when this Cacheable is read back from cache it refers to the same memory area as - * used by the cache for caching it. - * EXCLUSIVE means when this Cacheable is read back from cache, the data was copied to an - * exclusive memory area of this Cacheable. + * used by the cache for caching it. EXCLUSIVE means when this Cacheable is read back from cache, + * the data was copied to an exclusive memory area of this Cacheable. */ - public static enum MemoryType { + enum MemoryType { SHARED, EXCLUSIVE } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java index 80b1288..5ed3d2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java @@ -127,7 +127,7 @@ public class NettyRpcFrameDecoder extends ByteToMessageDecoder { NettyServerCall reqTooBig = new NettyServerCall(header.getCallId(), connection.service, null, null, null, null, connection, 0, connection.addr, System.currentTimeMillis(), 0, - connection.rpcServer.reservoir, connection.rpcServer.cellBlockBuilder, null); + connection.rpcServer.bbAllocator, connection.rpcServer.cellBlockBuilder, null); connection.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 742a728..bba1bed 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -187,7 +187,7 @@ public class NettyRpcServer extends RpcServer { Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime, int timeout) throws IOException { NettyServerCall fakeCall = new NettyServerCall(-1, service, md, null, param, cellScanner, null, - -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null); + -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null); return call(fakeCall, status); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java index 2fae311..8dc08c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerCall.java @@ -21,9 +21,9 @@ import java.io.IOException; import java.net.InetAddress; import org.apache.hadoop.hbase.CellScanner; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; @@ -39,10 +39,10 @@ class NettyServerCall extends ServerCall<NettyServerRpcConnection> { NettyServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, NettyServerRpcConnection connection, long size, - InetAddress remoteAddress, long receiveTime, int timeout, - ByteBufferPool reservoir, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { - super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, - receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup); + InetAddress remoteAddress, long receiveTime, int timeout, ByteBuffAllocator bbAllocator, + CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { + super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime, + timeout, bbAllocator, cellBlockBuilder, reqCleanup); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java index ffa16bf..2f97f53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyServerRpcConnection.java @@ -59,12 +59,7 @@ class NettyServerRpcConnection extends ServerRpcConnection { void process(final ByteBuf buf) throws IOException, InterruptedException { if (connectionHeaderRead) { - this.callCleanup = new RpcServer.CallCleanup() { - @Override - public void run() { - buf.release(); - } - }; + this.callCleanup = buf::release; process(new SingleByteBuff(buf.nioBuffer())); } else { ByteBuffer connectionHeader = ByteBuffer.allocate(buf.readableBytes()); @@ -121,7 +116,7 @@ class NettyServerRpcConnection extends ServerRpcConnection { long size, final InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { return new NettyServerCall(id, service, md, header, param, cellScanner, this, size, - remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir, + remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, reqCleanup); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 3ab63dd..ac8c26c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -26,7 +26,6 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -38,16 +37,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CallQueueTooBigException; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; -import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler; import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.MultiByteBuff; -import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.security.SaslUtil; import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection; @@ -210,11 +205,7 @@ public abstract class RpcServer implements RpcServerInterface, protected UserProvider userProvider; - protected final ByteBufferPool reservoir; - // The requests and response will use buffers from ByteBufferPool, when the size of the - // request/response is at least this size. - // We make this to be 1/6th of the pool buffer size. - protected final int minSizeForReservoirUse; + protected final ByteBuffAllocator bbAllocator; protected volatile boolean allowFallbackToSimpleAuth; @@ -225,7 +216,7 @@ public abstract class RpcServer implements RpcServerInterface, private RSRpcServices rsRpcServices; @FunctionalInterface - protected static interface CallCleanup { + protected interface CallCleanup { void run(); } @@ -266,32 +257,7 @@ public abstract class RpcServer implements RpcServerInterface, final List<BlockingServiceAndInterface> services, final InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { - if (reservoirEnabled) { - int poolBufSize = conf.getInt(ByteBufferPool.BUFFER_SIZE_KEY, - ByteBufferPool.DEFAULT_BUFFER_SIZE); - // The max number of buffers to be pooled in the ByteBufferPool. The default value been - // selected based on the #handlers configured. When it is read request, 2 MB is the max size - // at which we will send back one RPC request. Means max we need 2 MB for creating the - // response cell block. (Well it might be much lesser than this because in 2 MB size calc, we - // include the heap size overhead of each cells also.) Considering 2 MB, we will need - // (2 * 1024 * 1024) / poolBufSize buffers to make the response cell block. Pool buffer size - // is by default 64 KB. - // In case of read request, at the end of the handler process, we will make the response - // cellblock and add the Call to connection's response Q and a single Responder thread takes - // connections and responses from that one by one and do the socket write. So there is chances - // that by the time a handler originated response is actually done writing to socket and so - // released the BBs it used, the handler might have processed one more read req. On an avg 2x - // we consider and consider that also for the max buffers to pool - int bufsForTwoMB = (2 * 1024 * 1024) / poolBufSize; - int maxPoolSize = conf.getInt(ByteBufferPool.MAX_POOL_SIZE_KEY, - conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, - HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT) * bufsForTwoMB * 2); - this.reservoir = new ByteBufferPool(poolBufSize, maxPoolSize); - this.minSizeForReservoirUse = getMinSizeForReservoirUse(this.reservoir); - } else { - reservoir = null; - this.minSizeForReservoirUse = Integer.MAX_VALUE;// reservoir itself not in place. - } + this.bbAllocator = ByteBuffAllocator.create(conf, reservoirEnabled); this.server = server; this.services = services; this.bindAddress = bindAddress; @@ -325,11 +291,6 @@ public abstract class RpcServer implements RpcServerInterface, this.scheduler = scheduler; } - @VisibleForTesting - static int getMinSizeForReservoirUse(ByteBufferPool pool) { - return pool.getBufferSize() / 6; - } - @Override public void onConfigurationChange(Configuration newConf) { initReconfigurable(newConf); @@ -652,55 +613,6 @@ public abstract class RpcServer implements RpcServerInterface, } /** - * This is extracted to a static method for better unit testing. We try to get buffer(s) from pool - * as much as possible. - * - * @param pool The ByteBufferPool to use - * @param minSizeForPoolUse Only for buffer size above this, we will try to use pool. Any buffer - * need of size below this, create on heap ByteBuffer. - * @param reqLen Bytes count in request - */ - @VisibleForTesting - static Pair<ByteBuff, CallCleanup> allocateByteBuffToReadInto(ByteBufferPool pool, - int minSizeForPoolUse, int reqLen) { - ByteBuff resultBuf; - List<ByteBuffer> bbs = new ArrayList<>((reqLen / pool.getBufferSize()) + 1); - int remain = reqLen; - ByteBuffer buf = null; - while (remain >= minSizeForPoolUse && (buf = pool.getBuffer()) != null) { - bbs.add(buf); - remain -= pool.getBufferSize(); - } - ByteBuffer[] bufsFromPool = null; - if (bbs.size() > 0) { - bufsFromPool = new ByteBuffer[bbs.size()]; - bbs.toArray(bufsFromPool); - } - if (remain > 0) { - bbs.add(ByteBuffer.allocate(remain)); - } - if (bbs.size() > 1) { - ByteBuffer[] items = new ByteBuffer[bbs.size()]; - bbs.toArray(items); - resultBuf = new MultiByteBuff(items); - } else { - // We are backed by single BB - resultBuf = new SingleByteBuff(bbs.get(0)); - } - resultBuf.limit(reqLen); - if (bufsFromPool != null) { - final ByteBuffer[] bufsFromPoolFinal = bufsFromPool; - return new Pair<>(resultBuf, () -> { - // Return back all the BBs to pool - for (int i = 0; i < bufsFromPoolFinal.length; i++) { - pool.putbackBuffer(bufsFromPoolFinal[i]); - } - }); - } - return new Pair<>(resultBuf, null); - } - - /** * Needed for features such as delayed calls. We need to be able to store the current call * so that we can complete it later or ask questions of what is supported by the current ongoing * call. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index cf1cf9a..f93f3a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -26,10 +26,10 @@ import java.util.Optional; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; -import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.security.User; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; @@ -67,7 +67,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc protected long startTime; protected final long deadline;// the deadline to handle this call, if exceed we can drop it. - protected final ByteBufferPool reservoir; + protected final ByteBuffAllocator bbAllocator; protected final CellBlockBuilder cellBlockBuilder; @@ -91,11 +91,11 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc private long exceptionSize = 0; private final boolean retryImmediatelySupported; - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", - justification="Can't figure why this complaint is happening... see below") + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH", + justification = "Can't figure why this complaint is happening... see below") ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header, - Message param, CellScanner cellScanner, T connection, long size, - InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir, + Message param, CellScanner cellScanner, T connection, long size, InetAddress remoteAddress, + long receiveTime, int timeout, ByteBuffAllocator byteBuffAllocator, CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup) { this.id = id; this.service = service; @@ -118,7 +118,7 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc this.remoteAddress = remoteAddress; this.timeout = timeout; this.deadline = this.timeout > 0 ? this.receiveTime + this.timeout : Long.MAX_VALUE; - this.reservoir = reservoir; + this.bbAllocator = byteBuffAllocator; this.cellBlockBuilder = cellBlockBuilder; this.reqCleanup = reqCleanup; } @@ -199,9 +199,9 @@ abstract class ServerCall<T extends ServerRpcConnection> implements RpcCall, Rpc // high when we can avoid a big buffer allocation on each rpc. List<ByteBuffer> cellBlock = null; int cellBlockSize = 0; - if (this.reservoir != null) { + if (bbAllocator.isReservoirEnabled()) { this.cellBlockStream = this.cellBlockBuilder.buildCellBlockStream(this.connection.codec, - this.connection.compressionCodec, cells, this.reservoir); + this.connection.compressionCodec, cells, bbAllocator); if (this.cellBlockStream != null) { cellBlock = this.cellBlockStream.getByteBuffers(); cellBlockSize = this.cellBlockStream.size(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java index 2a8cfbe..f3f7807 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcServer.java @@ -488,7 +488,7 @@ public class SimpleRpcServer extends RpcServer { Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime, int timeout) throws IOException { SimpleServerCall fakeCall = new SimpleServerCall(-1, service, md, null, param, cellScanner, - null, -1, null, receiveTime, timeout, reservoir, cellBlockBuilder, null, null); + null, -1, null, receiveTime, timeout, bbAllocator, cellBlockBuilder, null, null); return call(fakeCall, status); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java index 6084138..311b4c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerCall.java @@ -21,9 +21,9 @@ import java.io.IOException; import java.net.InetAddress; import org.apache.hadoop.hbase.CellScanner; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteBufferPool; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; @@ -42,11 +42,12 @@ class SimpleServerCall extends ServerCall<SimpleServerRpcConnection> { justification = "Can't figure why this complaint is happening... see below") SimpleServerCall(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, - SimpleServerRpcConnection connection, long size, - final InetAddress remoteAddress, long receiveTime, int timeout, ByteBufferPool reservoir, - CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, SimpleRpcServerResponder responder) { - super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, - receiveTime, timeout, reservoir, cellBlockBuilder, reqCleanup); + SimpleServerRpcConnection connection, long size, final InetAddress remoteAddress, + long receiveTime, int timeout, ByteBuffAllocator bbAllocator, + CellBlockBuilder cellBlockBuilder, CallCleanup reqCleanup, + SimpleRpcServerResponder responder) { + super(id, service, md, header, param, cellScanner, connection, size, remoteAddress, receiveTime, + timeout, bbAllocator, cellBlockBuilder, reqCleanup); this.responder = responder; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java index b4b5f33..01127cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java @@ -36,14 +36,12 @@ import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.exceptions.RequestTooBigException; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream; import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.util.Pair; /** Reads calls from a connection and queues them for handling. */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT", @@ -212,7 +210,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection { // Notify the client about the offending request SimpleServerCall reqTooBig = new SimpleServerCall(header.getCallId(), this.service, null, null, null, null, this, 0, this.addr, System.currentTimeMillis(), 0, - this.rpcServer.reservoir, this.rpcServer.cellBlockBuilder, null, responder); + this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, null, responder); this.rpcServer.metrics.exception(SimpleRpcServer.REQUEST_TOO_BIG_EXCEPTION); // Make sure the client recognizes the underlying exception // Otherwise, throw a DoNotRetryIOException. @@ -255,24 +253,8 @@ class SimpleServerRpcConnection extends ServerRpcConnection { // It creates the ByteBuff and CallCleanup and assign to Connection instance. private void initByteBuffToReadInto(int length) { - // We create random on heap buffers are read into those when - // 1. ByteBufferPool is not there. - // 2. When the size of the req is very small. Using a large sized (64 KB) buffer from pool is - // waste then. Also if all the reqs are of this size, we will be creating larger sized - // buffers and pool them permanently. This include Scan/Get request and DDL kind of reqs like - // RegionOpen. - // 3. If it is an initial handshake signal or initial connection request. Any way then - // condition 2 itself will match - // 4. When SASL use is ON. - if (this.rpcServer.reservoir == null || skipInitialSaslHandshake || !connectionHeaderRead || - useSasl || length < this.rpcServer.minSizeForReservoirUse) { - this.data = new SingleByteBuff(ByteBuffer.allocate(length)); - } else { - Pair<ByteBuff, CallCleanup> pair = RpcServer.allocateByteBuffToReadInto( - this.rpcServer.reservoir, this.rpcServer.minSizeForReservoirUse, length); - this.data = pair.getFirst(); - this.callCleanup = pair.getSecond(); - } + this.data = rpcServer.bbAllocator.allocate(length); + this.callCleanup = data::release; } protected int channelDataRead(ReadableByteChannel channel, ByteBuff buf) throws IOException { @@ -345,7 +327,7 @@ class SimpleServerRpcConnection extends ServerRpcConnection { RequestHeader header, Message param, CellScanner cellScanner, long size, InetAddress remoteAddress, int timeout, CallCleanup reqCleanup) { return new SimpleServerCall(id, service, md, header, param, cellScanner, this, size, - remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.reservoir, + remoteAddress, System.currentTimeMillis(), timeout, this.rpcServer.bbAllocator, this.rpcServer.cellBlockBuilder, reqCleanup, this.responder); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java index 75800ba..8a993b8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY; import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER; import static org.junit.Assert.assertEquals; @@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; -import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.regionserver.CompactingMemStore; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -94,7 +94,7 @@ public class TestAsyncTableGetMultiThreaded { protected static void setUp(MemoryCompactionPolicy memoryCompaction) throws Exception { TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none"); TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L); - TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100); + TEST_UTIL.getConfiguration().setInt(MAX_BUFFER_COUNT_KEY, 100); TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, String.valueOf(memoryCompaction)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java index 267e9e8..abf20dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestServerLoadDurability.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -26,7 +28,6 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.io.ByteBufferPool; import org.apache.hadoop.hbase.ipc.NettyRpcServer; import org.apache.hadoop.hbase.ipc.RpcServerFactory; import org.apache.hadoop.hbase.ipc.SimpleRpcServer; @@ -71,9 +72,8 @@ public class TestServerLoadDurability { private static Configuration createConfigurationForSimpleRpcServer() { Configuration conf = HBaseConfiguration.create(); - conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, - SimpleRpcServer.class.getName()); - conf.setInt(ByteBufferPool.BUFFER_SIZE_KEY, 20); + conf.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName()); + conf.setInt(BUFFER_SIZE_KEY, 20); return conf; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 48080b2..32160a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -832,7 +832,7 @@ public class TestHFileBlock { if (ClassSize.is32BitJVM()) { assertEquals(64, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); } else { - assertEquals(72, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); + assertEquals(80, HFileBlock.MULTI_BYTE_BUFFER_HEAP_SIZE); } for (int size : new int[] { 100, 256, 12345 }) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java deleted file mode 100644 index 560190b..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcServer.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.nio.ByteBuffer; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.io.ByteBufferPool; -import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; -import org.apache.hadoop.hbase.nio.ByteBuff; -import org.apache.hadoop.hbase.nio.MultiByteBuff; -import org.apache.hadoop.hbase.nio.SingleByteBuff; -import org.apache.hadoop.hbase.testclassification.RPCTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Pair; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ RPCTests.class, SmallTests.class }) -public class TestRpcServer { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestRpcServer.class); - - @Test - public void testAllocateByteBuffToReadInto() throws Exception { - int maxBuffersInPool = 10; - ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool); - initPoolWithAllBuffers(pool, maxBuffersInPool); - ByteBuff buff = null; - Pair<ByteBuff, CallCleanup> pair; - // When the request size is less than 1/6th of the pool buffer size. We should use on demand - // created on heap Buffer - pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), - 200); - buff = pair.getFirst(); - assertTrue(buff.hasArray()); - assertEquals(maxBuffersInPool, pool.getQueueSize()); - assertNull(pair.getSecond()); - // When the request size is > 1/6th of the pool buffer size. - pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), - 1024); - buff = pair.getFirst(); - assertFalse(buff.hasArray()); - assertEquals(maxBuffersInPool - 1, pool.getQueueSize()); - assertNotNull(pair.getSecond()); - pair.getSecond().run();// CallCleanup#run should put back the BB to pool. - assertEquals(maxBuffersInPool, pool.getQueueSize()); - // Request size> pool buffer size - pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), - 7 * 1024); - buff = pair.getFirst(); - assertFalse(buff.hasArray()); - assertTrue(buff instanceof MultiByteBuff); - ByteBuffer[] bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers(); - assertEquals(2, bbs.length); - assertTrue(bbs[0].isDirect()); - assertTrue(bbs[1].isDirect()); - assertEquals(6 * 1024, bbs[0].limit()); - assertEquals(1024, bbs[1].limit()); - assertEquals(maxBuffersInPool - 2, pool.getQueueSize()); - assertNotNull(pair.getSecond()); - pair.getSecond().run();// CallCleanup#run should put back the BB to pool. - assertEquals(maxBuffersInPool, pool.getQueueSize()); - - pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), - 6 * 1024 + 200); - buff = pair.getFirst(); - assertFalse(buff.hasArray()); - assertTrue(buff instanceof MultiByteBuff); - bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers(); - assertEquals(2, bbs.length); - assertTrue(bbs[0].isDirect()); - assertFalse(bbs[1].isDirect()); - assertEquals(6 * 1024, bbs[0].limit()); - assertEquals(200, bbs[1].limit()); - assertEquals(maxBuffersInPool - 1, pool.getQueueSize()); - assertNotNull(pair.getSecond()); - pair.getSecond().run();// CallCleanup#run should put back the BB to pool. - assertEquals(maxBuffersInPool, pool.getQueueSize()); - - ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool - 1]; - for (int i = 0; i < maxBuffersInPool - 1; i++) { - buffers[i] = pool.getBuffer(); - } - pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), - 20 * 1024); - buff = pair.getFirst(); - assertFalse(buff.hasArray()); - assertTrue(buff instanceof MultiByteBuff); - bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers(); - assertEquals(2, bbs.length); - assertTrue(bbs[0].isDirect()); - assertFalse(bbs[1].isDirect()); - assertEquals(6 * 1024, bbs[0].limit()); - assertEquals(14 * 1024, bbs[1].limit()); - assertEquals(0, pool.getQueueSize()); - assertNotNull(pair.getSecond()); - pair.getSecond().run();// CallCleanup#run should put back the BB to pool. - assertEquals(1, pool.getQueueSize()); - pool.getBuffer(); - pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), - 7 * 1024); - buff = pair.getFirst(); - assertTrue(buff.hasArray()); - assertTrue(buff instanceof SingleByteBuff); - assertEquals(7 * 1024, ((SingleByteBuff) buff).getEnclosingByteBuffer().limit()); - assertNull(pair.getSecond()); - } - - private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) { - ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool]; - // Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back - // all. Makes pool with max #buffers. - for (int i = 0; i < maxBuffersInPool; i++) { - buffers[i] = pool.getBuffer(); - } - for (ByteBuffer buf : buffers) { - pool.putbackBuffer(buf); - } - } -}
