http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java deleted file mode 100644 index 1b4ad2a..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuffer.java +++ /dev/null @@ -1,1047 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.nio; - -import java.nio.BufferOverflowException; -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; -import java.nio.InvalidMarkException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.util.ByteBufferUtils; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.WritableUtils; - -/** - * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger - * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, - * short, long etc and doing operations like mark, reset, slice etc. This has to be used when - * data is split across multiple byte buffers and we don't want copy them to single buffer - * for reading from it. - */ [email protected] -public class MultiByteBuffer { - - private final ByteBuffer[] items; - // Pointer to the current item in the MBB - private ByteBuffer curItem = null; - // Index of the current item in the MBB - private int curItemIndex = 0; - /** - * An indicator that helps in short circuiting some of the APIs functionality - * if the MBB is backed by single item - */ - private final boolean singleItem; - private int limit = 0; - private int limitedItemIndex; - private int markedItemIndex = -1; - private final int[] itemBeginPos; - - public MultiByteBuffer(ByteBuffer... items) { - assert items != null; - assert items.length > 0; - this.items = items; - this.curItem = this.items[this.curItemIndex]; - this.singleItem = items.length == 1; - // See below optimization in getInt(int) where we check whether the given index land in current - // item. For this we need to check whether the passed index is less than the next item begin - // offset. To handle this effectively for the last item buffer, we add an extra item into this - // array. - itemBeginPos = new int[items.length + 1]; - int offset = 0; - for (int i = 0; i < items.length; i++) { - ByteBuffer item = items[i]; - item.rewind(); - itemBeginPos[i] = offset; - int l = item.limit() - item.position(); - offset += l; - } - this.limit = offset; - this.itemBeginPos[items.length] = offset + 1; - this.limitedItemIndex = this.items.length - 1; - } - - private MultiByteBuffer(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex, - int curItemIndex, int markedIndex) { - this.items = items; - this.curItemIndex = curItemIndex; - this.curItem = this.items[this.curItemIndex]; - this.singleItem = items.length == 1; - this.itemBeginPos = itemBeginPos; - this.limit = limit; - this.limitedItemIndex = limitedIndex; - this.markedItemIndex = markedIndex; - } - - /** - * @return the underlying array if this MultiByteBuffer is made up of single on heap ByteBuffer. - * @throws UnsupportedOperationException - if the MBB is not made up of single item - * or if the single item is a Direct Byte Buffer - */ - public byte[] array() { - if (hasArray()) { - return this.curItem.array(); - } - throw new UnsupportedOperationException(); - } - - /** - * @return the array offset of the item ByteBuffer if the MBB is made up of - * single on heap ByteBuffer - * @throws UnsupportedOperationException if the MBB is not made up of single item or - * the single item is a Direct byte Buffer - */ - public int arrayOffset() { - if (hasArray()) { - return this.curItem.arrayOffset(); - } - throw new UnsupportedOperationException(); - } - - /** - * @return true if the MBB is made up of single item and that single item is an - * on heap Byte Buffer - */ - public boolean hasArray() { - return this.singleItem && this.curItem.hasArray(); - } - - /** - * @return the total capacity of this MultiByteBuffer. - */ - public int capacity() { - int c = 0; - for (ByteBuffer item : this.items) { - c += item.capacity(); - } - return c; - } - - /** - * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers - * @param index - * @return the byte at the given index - */ - public byte get(int index) { - if (singleItem) { - return this.curItem.get(index); - } - int itemIndex = getItemIndex(index); - return this.items[itemIndex].get(index - this.itemBeginPos[itemIndex]); - } - - /* - * Returns in which sub ByteBuffer, the given element index will be available. - */ - private int getItemIndex(int elemIndex) { - int index = 1; - while (elemIndex >= this.itemBeginPos[index]) { - index++; - if (index == this.itemBeginPos.length) { - throw new IndexOutOfBoundsException(); - } - } - return index - 1; - } - - /* - * Returns in which sub ByteBuffer, the given element index will be available. In this case we are - * sure that the item will be after MBB's current position - */ - private int getItemIndexFromCurItemIndex(int elemIndex) { - int index = this.curItemIndex; - while (elemIndex >= this.itemBeginPos[index]) { - index++; - if (index == this.itemBeginPos.length) { - throw new IndexOutOfBoundsException(); - } - } - return index - 1; - } - - /** - * Fetches the short at the given index. Does not change position of the underlying ByteBuffers - * @param index - * @return the short value at the given index - */ - public short getShort(int index) { - if (singleItem) { - return ByteBufferUtils.toShort(curItem, index); - } - // Mostly the index specified will land within this current item. Short circuit for that - int itemIndex; - if (this.itemBeginPos[this.curItemIndex] <= index - && this.itemBeginPos[this.curItemIndex + 1] > index) { - itemIndex = this.curItemIndex; - } else { - itemIndex = getItemIndex(index); - } - ByteBuffer item = items[itemIndex]; - int offsetInItem = index - this.itemBeginPos[itemIndex]; - if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) { - return ByteBufferUtils.toShort(item, offsetInItem); - } - if (items.length - 1 == itemIndex) { - // means cur item is the last one and we wont be able to read a int. Throw exception - throw new BufferUnderflowException(); - } - ByteBuffer nextItem = items[itemIndex + 1]; - // Get available one byte from this item and remaining one from next - short n = 0; - n ^= item.get(offsetInItem) & 0xFF; - n <<= 8; - n ^= nextItem.get(0) & 0xFF; - return n; - } - - /** - * Fetches the int at the given index. Does not change position of the underlying ByteBuffers - * @param index - * @return the int value at the given index - */ - public int getInt(int index) { - if (singleItem) { - return ByteBufferUtils.toInt(this.curItem, index); - } - // Mostly the index specified will land within this current item. Short circuit for that - int itemIndex; - if (this.itemBeginPos[this.curItemIndex] <= index - && this.itemBeginPos[this.curItemIndex + 1] > index) { - itemIndex = this.curItemIndex; - } else { - itemIndex = getItemIndex(index); - } - return getInt(index, itemIndex); - } - - /** - * Fetches the int at the given index. Does not change position of the underlying ByteBuffers. The - * difference for this API from {@link #getInt(int)} is the caller is sure that the index will be - * after the current position of this MBB. - * - * @param index - * @return the int value at the given index - */ - public int getIntStrictlyForward(int index) { - if (singleItem) { - return ByteBufferUtils.toInt(this.curItem, index); - } - // Mostly the index specified will land within this current item. Short circuit for that - int itemIndex; - if (this.itemBeginPos[this.curItemIndex + 1] > index) { - itemIndex = this.curItemIndex; - } else { - itemIndex = getItemIndexFromCurItemIndex(index); - } - return getInt(index, itemIndex); - } - - private int getInt(int index, int itemIndex) { - ByteBuffer item = items[itemIndex]; - int offsetInItem = index - this.itemBeginPos[itemIndex]; - int remainingLen = item.limit() - offsetInItem; - if (remainingLen >= Bytes.SIZEOF_INT) { - return ByteBufferUtils.toInt(item, offsetInItem); - } - if (items.length - 1 == itemIndex) { - // means cur item is the last one and we wont be able to read a int. Throw exception - throw new BufferUnderflowException(); - } - ByteBuffer nextItem = items[itemIndex + 1]; - // Get available bytes from this item and remaining from next - int l = 0; - for (int i = offsetInItem; i < item.capacity(); i++) { - l <<= 8; - l ^= item.get(i) & 0xFF; - } - for (int i = 0; i < Bytes.SIZEOF_INT - remainingLen; i++) { - l <<= 8; - l ^= nextItem.get(i) & 0xFF; - } - return l; - } - - /** - * Fetches the long at the given index. Does not change position of the underlying ByteBuffers - * @param index - * @return the long value at the given index - */ - public long getLong(int index) { - if (singleItem) { - return this.curItem.getLong(index); - } - // Mostly the index specified will land within this current item. Short circuit for that - int itemIndex; - if (this.itemBeginPos[this.curItemIndex] <= index - && this.itemBeginPos[this.curItemIndex + 1] > index) { - itemIndex = this.curItemIndex; - } else { - itemIndex = getItemIndex(index); - } - ByteBuffer item = items[itemIndex]; - int offsetInItem = index - this.itemBeginPos[itemIndex]; - int remainingLen = item.limit() - offsetInItem; - if (remainingLen >= Bytes.SIZEOF_LONG) { - return ByteBufferUtils.toLong(item, offsetInItem); - } - if (items.length - 1 == itemIndex) { - // means cur item is the last one and we wont be able to read a long. Throw exception - throw new BufferUnderflowException(); - } - ByteBuffer nextItem = items[itemIndex + 1]; - // Get available bytes from this item and remaining from next - long l = 0; - for (int i = offsetInItem; i < item.capacity(); i++) { - l <<= 8; - l ^= item.get(i) & 0xFF; - } - for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) { - l <<= 8; - l ^= nextItem.get(i) & 0xFF; - } - return l; - } - - /** - * @return this MBB's current position - */ - public int position() { - if (this.singleItem) return this.curItem.position(); - return itemBeginPos[this.curItemIndex] + this.curItem.position(); - } - - /** - * Sets this MBB's position to the given value. - * @param position - * @return this object - */ - public MultiByteBuffer position(int position) { - if (this.singleItem) { - this.curItem.position(position); - return this; - } - // Short circuit for positioning within the cur item. Mostly that is the case. - if (this.itemBeginPos[this.curItemIndex] <= position - && this.itemBeginPos[this.curItemIndex + 1] > position) { - this.curItem.position(position - this.itemBeginPos[this.curItemIndex]); - return this; - } - int itemIndex = getItemIndex(position); - // All items from 0 - curItem-1 set position at end. - for (int i = 0; i < itemIndex; i++) { - this.items[i].position(this.items[i].limit()); - } - // All items after curItem set position at begin - for (int i = itemIndex + 1; i < this.items.length; i++) { - this.items[i].position(0); - } - this.curItem = this.items[itemIndex]; - this.curItem.position(position - this.itemBeginPos[itemIndex]); - this.curItemIndex = itemIndex; - return this; - } - - /** - * Rewinds this MBB and the position is set to 0 - * @return this object - */ - public MultiByteBuffer rewind() { - for (int i = 0; i < this.items.length; i++) { - this.items[i].rewind(); - } - this.curItemIndex = 0; - this.curItem = this.items[this.curItemIndex]; - this.markedItemIndex = -1; - return this; - } - - /** - * Marks the current position of the MBB - * @return this object - */ - public MultiByteBuffer mark() { - this.markedItemIndex = this.curItemIndex; - this.curItem.mark(); - return this; - } - - /** - * Similar to {@link ByteBuffer}.reset(), ensures that this MBB - * is reset back to last marked position. - * @return This MBB - */ - public MultiByteBuffer reset() { - // when the buffer is moved to the next one.. the reset should happen on the previous marked - // item and the new one should be taken as the base - if (this.markedItemIndex < 0) throw new InvalidMarkException(); - ByteBuffer markedItem = this.items[this.markedItemIndex]; - markedItem.reset(); - this.curItem = markedItem; - // All items after the marked position upto the current item should be reset to 0 - for (int i = this.curItemIndex; i > this.markedItemIndex; i--) { - this.items[i].position(0); - } - this.curItemIndex = this.markedItemIndex; - return this; - } - - /** - * Returns the number of elements between the current position and the - * limit. - * @return the remaining elements in this MBB - */ - public int remaining() { - int remain = 0; - for (int i = curItemIndex; i < items.length; i++) { - remain += items[i].remaining(); - } - return remain; - } - - /** - * Returns true if there are elements between the current position and the limt - * @return true if there are elements, false otherwise - */ - public final boolean hasRemaining() { - return this.curItem.hasRemaining() || this.curItemIndex < this.items.length - 1; - } - - /** - * A relative method that returns byte at the current position. Increments the - * current position by the size of a byte. - * @return the byte at the current position - */ - public byte get() { - if (!singleItem && this.curItem.remaining() == 0) { - if (items.length - 1 == this.curItemIndex) { - // means cur item is the last one and we wont be able to read a long. Throw exception - throw new BufferUnderflowException(); - } - this.curItemIndex++; - this.curItem = this.items[this.curItemIndex]; - } - return this.curItem.get(); - } - - /** - * Returns the short value at the current position. Also advances the position by the size - * of short - * - * @return the short value at the current position - */ - public short getShort() { - if (singleItem) { - return this.curItem.getShort(); - } - int remaining = this.curItem.remaining(); - if (remaining >= Bytes.SIZEOF_SHORT) { - return this.curItem.getShort(); - } - if (remaining == 0) { - if (items.length - 1 == this.curItemIndex) { - // means cur item is the last one and we wont be able to read a long. Throw exception - throw new BufferUnderflowException(); - } - this.curItemIndex++; - this.curItem = this.items[this.curItemIndex]; - return this.curItem.getShort(); - } - short n = 0; - n ^= get() & 0xFF; - n <<= 8; - n ^= get() & 0xFF; - return n; - } - - /** - * Returns the int value at the current position. Also advances the position by the size of int - * - * @return the int value at the current position - */ - public int getInt() { - if (singleItem) { - return this.curItem.getInt(); - } - int remaining = this.curItem.remaining(); - if (remaining >= Bytes.SIZEOF_INT) { - return this.curItem.getInt(); - } - if (remaining == 0) { - if (items.length - 1 == this.curItemIndex) { - // means cur item is the last one and we wont be able to read a long. Throw exception - throw new BufferUnderflowException(); - } - this.curItemIndex++; - this.curItem = this.items[this.curItemIndex]; - return this.curItem.getInt(); - } - // Get available bytes from this item and remaining from next - int n = 0; - for (int i = 0; i < Bytes.SIZEOF_INT; i++) { - n <<= 8; - n ^= get() & 0xFF; - } - return n; - } - - - /** - * Returns the long value at the current position. Also advances the position by the size of long - * - * @return the long value at the current position - */ - public long getLong() { - if (singleItem) { - return this.curItem.getLong(); - } - int remaining = this.curItem.remaining(); - if (remaining >= Bytes.SIZEOF_LONG) { - return this.curItem.getLong(); - } - if (remaining == 0) { - if (items.length - 1 == this.curItemIndex) { - // means cur item is the last one and we wont be able to read a long. Throw exception - throw new BufferUnderflowException(); - } - this.curItemIndex++; - this.curItem = this.items[this.curItemIndex]; - return this.curItem.getLong(); - } - // Get available bytes from this item and remaining from next - long l = 0; - for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { - l <<= 8; - l ^= get() & 0xFF; - } - return l; - } - - /** - * Returns the long value, stored as variable long at the current position of this - * MultiByteBuffer. Also advances it's position accordingly. - * This is similar to {@link WritableUtils#readVLong(DataInput)} but reads from a - * {@link MultiByteBuffer} - * - * @return the long value at the current position - */ - public long getVLong() { - byte firstByte = get(); - int len = WritableUtils.decodeVIntSize(firstByte); - if (len == 1) { - return firstByte; - } - long i = 0; - byte b; - for (int idx = 0; idx < len - 1; idx++) { - b = get(); - i = i << 8; - i = i | (b & 0xFF); - } - return (WritableUtils.isNegativeVInt(firstByte) ? (i ^ -1L) : i); - } - - /** - * Copies the content from this MBB's current position to the byte array and fills it. Also - * advances the position of the MBB by the length of the byte[]. - * @param dst - * @return this object - */ - public MultiByteBuffer get(byte[] dst) { - return get(dst, 0, dst.length); - } - - /** - * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset. - * Also advances the position of the MBB by the given length. - * @param dst - * @param offset within the current array - * @param length upto which the bytes to be copied - * @return this object - */ - public MultiByteBuffer get(byte[] dst, int offset, int length) { - if (this.singleItem) { - this.curItem.get(dst, offset, length); - } else { - while (length > 0) { - int toRead = Math.min(length, this.curItem.remaining()); - this.curItem.get(dst, offset, toRead); - length -= toRead; - if (length == 0) - break; - this.curItemIndex++; - this.curItem = this.items[this.curItemIndex]; - offset += toRead; - } - } - return this; - } - - /** - * Marks the limit of this MBB. - * @param limit - * @return This MBB - */ - public MultiByteBuffer limit(int limit) { - this.limit = limit; - if (singleItem) { - this.curItem.limit(limit); - return this; - } - // Normally the limit will try to limit within the last BB item - int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex]; - if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) { - this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin); - return this; - } - int itemIndex = getItemIndex(limit); - int beginOffset = this.itemBeginPos[itemIndex]; - int offsetInItem = limit - beginOffset; - ByteBuffer item = items[itemIndex]; - item.limit(offsetInItem); - for (int i = this.limitedItemIndex; i < itemIndex; i++) { - this.items[i].limit(this.items[i].capacity()); - } - this.limitedItemIndex = itemIndex; - for (int i = itemIndex + 1; i < this.items.length; i++) { - this.items[i].limit(this.items[i].position()); - } - return this; - } - - /** - * Returns the limit of this MBB - * @return limit of the MBB - */ - public int limit() { - return this.limit; - } - - /** - * Returns an MBB which is a sliced version of this MBB. The position, limit and mark - * of the new MBB will be independent than that of the original MBB. - * The content of the new MBB will start at this MBB's current position - * @return a sliced MBB - */ - public MultiByteBuffer slice() { - if (this.singleItem) { - return new MultiByteBuffer(curItem.slice()); - } - ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1]; - for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) { - copy[j] = this.items[i].slice(); - } - return new MultiByteBuffer(copy); - } - - /** - * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark - * of the new MBB will be independent than that of the original MBB. - * The content of the new MBB will start at this MBB's current position - * The position, limit and mark of the new MBB would be identical to this MBB in terms of - * values. - * @return a sliced MBB - */ - public MultiByteBuffer duplicate() { - if (this.singleItem) { - return new MultiByteBuffer(new ByteBuffer[] { curItem.duplicate() }, this.itemBeginPos, - this.limit, this.limitedItemIndex, this.curItemIndex, this.markedItemIndex); - } - ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length]; - for (int i = 0; i < this.items.length; i++) { - itemsCopy[i] = items[i].duplicate(); - } - return new MultiByteBuffer(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex, - this.curItemIndex, this.markedItemIndex); - } - - /** - * Writes a byte to this MBB at the current position and increments the position - * @param b - * @return this object - */ - public MultiByteBuffer put(byte b) { - if (!singleItem && this.curItem.remaining() == 0) { - if (this.curItemIndex == this.items.length - 1) { - throw new BufferOverflowException(); - } - this.curItemIndex++; - this.curItem = this.items[this.curItemIndex]; - } - this.curItem.put(b); - return this; - } - - /** - * Writes a byte to this MBB at the given index - * @param index - * @param b - * @return this object - */ - public MultiByteBuffer put(int index, byte b) { - if (this.singleItem) { - this.curItem.put(index, b); - return this; - } - int itemIndex = getItemIndex(limit); - ByteBuffer item = items[itemIndex]; - item.put(index - itemBeginPos[itemIndex], b); - return this; - } - - /** - * Copies from a src MBB to this MBB. - * @param offset the position in this MBB to which the copy should happen - * @param src the src MBB - * @param srcOffset the offset in the src MBB from where the elements should be read - * @param length the length upto which the copy should happen - */ - public void put(int offset, MultiByteBuffer src, int srcOffset, int length) { - if (src.hasArray() && this.hasArray()) { - System.arraycopy(src.array(), srcOffset + src.arrayOffset(), this.array(), this.arrayOffset() - + offset, length); - } else { - int destItemIndex = getItemIndex(offset); - int srcItemIndex = getItemIndex(srcOffset); - ByteBuffer destItem = this.items[destItemIndex]; - offset = offset - this.itemBeginPos[destItemIndex]; - - ByteBuffer srcItem = src.items[srcItemIndex]; - srcOffset = srcOffset - this.itemBeginPos[srcItemIndex]; - int toRead, toWrite, toMove; - while (length > 0) { - toWrite = destItem.limit() - offset; - toRead = srcItem.limit() - srcOffset; - toMove = Math.min(length, Math.min(toRead, toWrite)); - ByteBufferUtils.copyFromBufferToBuffer(destItem, srcItem, srcOffset, offset, toMove); - length -= toMove; - if (length == 0) break; - if (toRead < toWrite) { - srcItem = src.items[++srcItemIndex]; - srcOffset = 0; - offset += toMove; - } else if (toRead > toWrite) { - destItem = this.items[++destItemIndex]; - offset = 0; - srcOffset += toMove; - } else { - // toRead = toWrite case - srcItem = src.items[++srcItemIndex]; - srcOffset = 0; - destItem = this.items[++destItemIndex]; - offset = 0; - } - } - } - } - - /** - * Writes an int to this MBB at its current position. Also advances the position by size of int - * @param val Int value to write - * @return this object - */ - public MultiByteBuffer putInt(int val) { - if (singleItem || this.curItem.remaining() >= Bytes.SIZEOF_INT) { - this.curItem.putInt(val); - return this; - } - if (this.curItemIndex == this.items.length - 1) { - throw new BufferOverflowException(); - } - // During read, we will read as byte by byte for this case. So just write in Big endian - put(int3(val)); - put(int2(val)); - put(int1(val)); - put(int0(val)); - return this; - } - - private static byte int3(int x) { - return (byte) (x >> 24); - } - - private static byte int2(int x) { - return (byte) (x >> 16); - } - - private static byte int1(int x) { - return (byte) (x >> 8); - } - - private static byte int0(int x) { - return (byte) (x); - } - - /** - * Copies from the given byte[] to this MBB - * @param src - * @return this MBB - */ - public final MultiByteBuffer put(byte[] src) { - return put(src, 0, src.length); - } - - /** - * Copies from the given byte[] to this MBB - * @param src - * @param offset the position in the byte array from which the copy should be done - * @param length the length upto which the copy should happen - * @return this MBB - */ - public MultiByteBuffer put(byte[] src, int offset, int length) { - if (singleItem || this.curItem.remaining() >= length) { - ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length); - return this; - } - int end = offset + length; - for (int i = offset; i < end; i++) { - this.put(src[i]); - } - return this; - } - - - /** - * Writes a long to this MBB at its current position. Also advances the position by size of long - * @param val Long value to write - * @return this object - */ - public MultiByteBuffer putLong(long val) { - if (singleItem || this.curItem.remaining() >= Bytes.SIZEOF_LONG) { - this.curItem.putLong(val); - return this; - } - if (this.curItemIndex == this.items.length - 1) { - throw new BufferOverflowException(); - } - // During read, we will read as byte by byte for this case. So just write in Big endian - put(long7(val)); - put(long6(val)); - put(long5(val)); - put(long4(val)); - put(long3(val)); - put(long2(val)); - put(long1(val)); - put(long0(val)); - return this; - } - - private static byte long7(long x) { - return (byte) (x >> 56); - } - - private static byte long6(long x) { - return (byte) (x >> 48); - } - - private static byte long5(long x) { - return (byte) (x >> 40); - } - - private static byte long4(long x) { - return (byte) (x >> 32); - } - - private static byte long3(long x) { - return (byte) (x >> 24); - } - - private static byte long2(long x) { - return (byte) (x >> 16); - } - - private static byte long1(long x) { - return (byte) (x >> 8); - } - - private static byte long0(long x) { - return (byte) (x); - } - - /** - * Jumps the current position of this MBB by specified length. - * @param length - */ - public void skip(int length) { - if (this.singleItem) { - this.curItem.position(this.curItem.position() + length); - return; - } - // Get available bytes from this item and remaining from next - int jump = 0; - while (true) { - jump = this.curItem.remaining(); - if (jump >= length) { - this.curItem.position(this.curItem.position() + length); - break; - } - this.curItem.position(this.curItem.position() + jump); - length -= jump; - this.curItemIndex++; - this.curItem = this.items[this.curItemIndex]; - } - } - - /** - * Jumps back the current position of this MBB by specified length. - * @param length - */ - public void moveBack(int length) { - if (this.singleItem) { - this.curItem.position(curItem.position() - length); - return; - } - while (length != 0) { - if (length > curItem.position()) { - length -= curItem.position(); - this.curItem.position(0); - this.curItemIndex--; - this.curItem = this.items[curItemIndex]; - } else { - this.curItem.position(curItem.position() - length); - break; - } - } - } - - /** - * Returns bytes from current position till length specified, as a single ByteButter. When all - * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item - * as such will be returned. So users are warned not to change the position or limit of this - * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required - * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy - * the bytes to a newly created ByteBuffer of required size and return that. - * - * @param length number of bytes required. - * @return bytes from current position till length specified, as a single ByteButter. - */ - public ByteBuffer asSubBuffer(int length) { - if (this.singleItem || this.curItem.remaining() >= length) { - return this.curItem; - } - int offset = 0; - byte[] dupB = new byte[length]; - int locCurItemIndex = curItemIndex; - ByteBuffer locCurItem = curItem; - while (length > 0) { - int toRead = Math.min(length, locCurItem.remaining()); - ByteBufferUtils - .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead); - length -= toRead; - if (length == 0) - break; - locCurItemIndex++; - locCurItem = this.items[locCurItemIndex]; - offset += toRead; - } - return ByteBuffer.wrap(dupB); - } - - /** - * Returns bytes from given offset till length specified, as a single ByteButter. When all these - * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as - * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are - * warned not to change the position or limit of this returned ByteBuffer. When the required bytes - * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created - * ByteBuffer of required size and return that. - * - * @param offset the offset in this MBB from where the subBuffer should be created - * @param length the length of the subBuffer - * @return a pair of bytes from current position till length specified, as a single ByteButter and - * offset in that Buffer where the bytes starts. - */ - public Pair<ByteBuffer, Integer> asSubBuffer(int offset, int length) { - if (this.singleItem) { - return new Pair<ByteBuffer, Integer>(this.curItem, offset); - } - if (this.itemBeginPos[this.curItemIndex] <= offset) { - int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex]; - if (this.curItem.limit() - relOffsetInCurItem >= length) { - return new Pair<ByteBuffer, Integer>(this.curItem, relOffsetInCurItem); - } - } - int itemIndex = getItemIndex(offset); - ByteBuffer item = this.items[itemIndex]; - offset = offset - this.itemBeginPos[itemIndex]; - if (item.limit() - offset >= length) { - return new Pair<ByteBuffer, Integer>(item, offset); - } - byte[] dst = new byte[length]; - int destOffset = 0; - while (length > 0) { - int toRead = Math.min(length, item.limit() - offset); - ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead); - length -= toRead; - if (length == 0) break; - itemIndex++; - item = this.items[itemIndex]; - destOffset += toRead; - offset = 0; - } - return new Pair<ByteBuffer, Integer>(ByteBuffer.wrap(dst), 0); - } - - /** - * Compares two MBBs - * - * @param buf1 the first MBB - * @param o1 the offset in the first MBB from where the compare has to happen - * @param len1 the length in the first MBB upto which the compare has to happen - * @param buf2 the second MBB - * @param o2 the offset in the second MBB from where the compare has to happen - * @param len2 the length in the second MBB upto which the compare has to happen - * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is - * smaller than buf2. - */ - public static int compareTo(MultiByteBuffer buf1, int o1, int len1, MultiByteBuffer buf2, int o2, - int len2) { - if (buf1.hasArray() && buf2.hasArray()) { - return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(), - buf2.arrayOffset() + o2, len2); - } - int end1 = o1 + len1; - int end2 = o2 + len2; - for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) { - int a = buf1.get(i) & 0xFF; - int b = buf2.get(j) & 0xFF; - if (a != b) { - return a - b; - } - } - return len1 - len2; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof MultiByteBuffer)) return false; - if (this == obj) return true; - MultiByteBuffer that = (MultiByteBuffer) obj; - if (this.capacity() != that.capacity()) return false; - if (compareTo(this, 0, this.capacity(), that, 0, this.capacity()) == 0) return true; - return false; - } - - @Override - public int hashCode() { - int hash = 0; - for (ByteBuffer b : this.items) { - hash += b.hashCode(); - } - return hash; - } -}
http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java new file mode 100644 index 0000000..62601f3 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.nio; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Pair; + +/** + * An implementation of ByteBuff where a single BB backs the BBI. This just acts + * as a wrapper over a normal BB - offheap or onheap + */ [email protected] +public class SingleByteBuff extends ByteBuff { + + // Underlying BB + private final ByteBuffer buf; + + public SingleByteBuff(ByteBuffer buf) { + this.buf = buf; + } + + @Override + public int position() { + return this.buf.position(); + } + + @Override + public SingleByteBuff position(int position) { + this.buf.position(position); + return this; + } + + @Override + public SingleByteBuff skip(int len) { + this.buf.position(this.buf.position() + len); + return this; + } + + @Override + public SingleByteBuff moveBack(int len) { + this.buf.position(this.buf.position() - len); + return this; + } + + @Override + public int capacity() { + return this.buf.capacity(); + } + + @Override + public int limit() { + return this.buf.limit(); + } + + @Override + public SingleByteBuff limit(int limit) { + this.buf.limit(limit); + return this; + } + + @Override + public SingleByteBuff rewind() { + this.buf.rewind(); + return this; + } + + @Override + public SingleByteBuff mark() { + this.buf.mark(); + return this; + } + + @Override + public ByteBuffer asSubByteBuffer(int length) { + // Just return the single BB that is available + return this.buf; + } + + @Override + public void asSubByteBuffer(int offset, int length, Pair<ByteBuffer, Integer> pair) { + // Just return the single BB that is available + pair.setFirst(this.buf); + pair.setSecond(offset); + } + + @Override + public int remaining() { + return this.buf.remaining(); + } + + @Override + public boolean hasRemaining() { + return buf.hasRemaining(); + } + + @Override + public SingleByteBuff reset() { + this.buf.reset(); + return this; + } + + @Override + public SingleByteBuff slice() { + return new SingleByteBuff(this.buf.slice()); + } + + @Override + public SingleByteBuff duplicate() { + return new SingleByteBuff(this.buf.duplicate()); + } + + @Override + public byte get() { + return buf.get(); + } + + @Override + public byte get(int index) { + return ByteBufferUtils.toByte(this.buf, index); + } + + @Override + public byte getByteStrictlyForward(int index) { + if (index < this.buf.position()) { + throw new IndexOutOfBoundsException("The index " + index + + " should not be less than current position " + this.position()); + } + return ByteBufferUtils.toByte(this.buf, index); + } + + @Override + public SingleByteBuff put(byte b) { + this.buf.put(b); + return this; + } + + @Override + public SingleByteBuff put(int index, byte b) { + buf.put(index, b); + return this; + } + + @Override + public void get(byte[] dst, int offset, int length) { + ByteBufferUtils.copyFromBufferToArray(dst, buf, buf.position(), offset, length); + buf.position(buf.position() + length); + } + + @Override + public void get(byte[] dst) { + get(dst, 0, dst.length); + } + + @Override + public SingleByteBuff put(int offset, ByteBuff src, int srcOffset, int length) { + if (src instanceof SingleByteBuff) { + ByteBufferUtils.copyFromBufferToBuffer(((SingleByteBuff) src).buf, this.buf, srcOffset, + offset, length); + } else { + // TODO we can do some optimization here? Call to asSubByteBuffer might + // create a copy. + Pair<ByteBuffer, Integer> pair = new Pair<ByteBuffer, Integer>(); + src.asSubByteBuffer(srcOffset, length, pair); + ByteBufferUtils.copyFromBufferToBuffer(pair.getFirst(), this.buf, pair.getSecond(), offset, + length); + } + return this; + } + + @Override + public SingleByteBuff put(byte[] src, int offset, int length) { + ByteBufferUtils.copyFromArrayToBuffer(this.buf, src, offset, length); + return this; + } + + @Override + public SingleByteBuff put(byte[] src) { + return put(src, 0, src.length); + } + + @Override + public boolean hasArray() { + return this.buf.hasArray(); + } + + @Override + public byte[] array() { + return this.buf.array(); + } + + @Override + public int arrayOffset() { + return this.buf.arrayOffset(); + } + + @Override + public short getShort() { + return this.buf.getShort(); + } + + @Override + public short getShort(int index) { + return ByteBufferUtils.toShort(this.buf, index); + } + + @Override + public short getShortStrictlyForward(int index) { + if (index < this.buf.position()) { + throw new IndexOutOfBoundsException("The index " + index + + " should not be less than current position " + this.position()); + } + return ByteBufferUtils.toShort(this.buf, index); + } + + @Override + public int getInt() { + return this.buf.getInt(); + } + + @Override + public SingleByteBuff putInt(int value) { + ByteBufferUtils.putInt(this.buf, value); + return this; + } + + @Override + public int getInt(int index) { + return ByteBufferUtils.toInt(this.buf, index); + } + + @Override + public int getIntStrictlyForward(int index) { + if (index < this.buf.position()) { + throw new IndexOutOfBoundsException("The index " + index + + " should not be less than current position " + this.position()); + } + return ByteBufferUtils.toInt(this.buf, index); + } + + @Override + public long getLong() { + return this.buf.getLong(); + } + + @Override + public SingleByteBuff putLong(long value) { + ByteBufferUtils.putLong(this.buf, value); + return this; + } + + @Override + public long getLong(int index) { + return ByteBufferUtils.toLong(this.buf, index); + } + + @Override + public long getLongStrictlyForward(int index) { + if (index < this.buf.position()) { + throw new IndexOutOfBoundsException("The index " + index + + " should not be less than current position " + this.position()); + } + return ByteBufferUtils.toLong(this.buf, index); + } + + @Override + public byte[] toBytes(int offset, int length) { + byte[] output = new byte[length]; + ByteBufferUtils.copyFromBufferToArray(output, buf, offset, 0, length); + return output; + } + + @Override + public void get(ByteBuffer out, int sourceOffset, int length) { + ByteBufferUtils.copyFromBufferToBuffer(buf, out, sourceOffset, length); + } + + @Override + public boolean equals(Object obj) { + if(!(obj instanceof SingleByteBuff)) return false; + return this.buf.equals(((SingleByteBuff)obj).buf); + } + + @Override + public int hashCode() { + return this.buf.hashCode(); + } + + /** + * @return the ByteBuffer which this wraps. + */ + ByteBuffer getEnclosingByteBuffer() { + return this.buf; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index d3414dd..986d6e0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -25,6 +25,9 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.util.StringUtils; /** @@ -200,4 +203,65 @@ public final class ByteBufferArray { } assert srcIndex == len; } + + /** + * Creates a ByteBuff from a given array of ByteBuffers from the given offset to the + * length specified. For eg, if there are 4 buffers forming an array each with length 10 and + * if we call asSubBuffer(5, 10) then we will create an MBB consisting of two BBs + * and the first one be a BB from 'position' 5 to a 'length' 5 and the 2nd BB will be from + * 'position' 0 to 'length' 5. + * @param offset + * @param len + * @return a ByteBuff formed from the underlying ByteBuffers + */ + public ByteBuff asSubByteBuff(long offset, int len) { + assert len >= 0; + long end = offset + len; + int startBuffer = (int) (offset / bufferSize), startBufferOffset = (int) (offset % bufferSize); + int endBuffer = (int) (end / bufferSize), endBufferOffset = (int) (end % bufferSize); + assert startBuffer >= 0 && startBuffer < bufferCount; + assert endBuffer >= 0 && endBuffer < bufferCount + || (endBuffer == bufferCount && endBufferOffset == 0); + if (startBuffer >= locks.length || startBuffer < 0) { + String msg = "Failed subArray, start=" + offset + ",startBuffer=" + startBuffer + + ",bufferSize=" + bufferSize; + LOG.error(msg); + throw new RuntimeException(msg); + } + int srcIndex = 0, cnt = -1; + ByteBuffer[] mbb = new ByteBuffer[endBuffer - startBuffer + 1]; + for (int i = startBuffer,j=0; i <= endBuffer; ++i,j++) { + Lock lock = locks[i]; + lock.lock(); + try { + ByteBuffer bb = buffers[i]; + if (i == startBuffer) { + cnt = bufferSize - startBufferOffset; + if (cnt > len) cnt = len; + ByteBuffer dup = bb.duplicate(); + dup.limit(startBufferOffset + cnt).position(startBufferOffset); + mbb[j] = dup.slice(); + } else if (i == endBuffer) { + cnt = endBufferOffset; + ByteBuffer dup = bb.duplicate(); + dup.position(0).limit(cnt); + mbb[j] = dup.slice(); + } else { + cnt = bufferSize ; + ByteBuffer dup = bb.duplicate(); + dup.position(0).limit(cnt); + mbb[j] = dup.slice(); + } + srcIndex += cnt; + } finally { + lock.unlock(); + } + } + assert srcIndex == len; + if (mbb.length > 1) { + return new MultiByteBuff(mbb); + } else { + return new SingleByteBuff(mbb[0]); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 7bb72a0..1fb5991 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -41,9 +41,9 @@ import sun.nio.ch.DirectBuffer; public final class ByteBufferUtils { // "Compressed integer" serialization helper constants. - private final static int VALUE_MASK = 0x7f; - private final static int NEXT_BIT_SHIFT = 7; - private final static int NEXT_BIT_MASK = 1 << 7; + public final static int VALUE_MASK = 0x7f; + public final static int NEXT_BIT_SHIFT = 7; + public final static int NEXT_BIT_MASK = 1 << 7; private ByteBufferUtils() { } @@ -139,6 +139,14 @@ public final class ByteBufferUtils { } } + public static byte toByte(ByteBuffer buffer, int offset) { + if (UnsafeAccess.isAvailable()) { + return UnsafeAccess.toByte(buffer, offset); + } else { + return buffer.get(offset); + } + } + /** * Copy the data to the output stream and update position in buffer. * @param out the stream to write bytes to @@ -182,6 +190,15 @@ public final class ByteBufferUtils { return fitInBytes; } + public static int putByte(ByteBuffer buffer, int offset, byte b) { + if (UnsafeAccess.isAvailable()) { + return UnsafeAccess.putByte(buffer, offset, b); + } else { + buffer.put(offset, b); + return offset + 1; + } + } + /** * Check how many bytes are required to store value. * @param value Value which size will be tested. @@ -334,30 +351,6 @@ public final class ByteBufferUtils { } /** - * Copy from one buffer to another from given offset. - * <p> - * Note : This will advance the position marker of {@code out} but not change the position maker - * for {@code in} - * @param out destination buffer - * @param in source buffer - * @param sourceOffset offset in the source buffer - * @param length how many bytes to copy - */ - public static void copyFromBufferToBuffer(ByteBuffer out, - ByteBuffer in, int sourceOffset, int length) { - if (in.hasArray() && out.hasArray()) { - System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), - out.array(), out.position() + - out.arrayOffset(), length); - skip(out, length); - } else { - for (int i = 0; i < length; ++i) { - out.put(in.get(sourceOffset + i)); - } - } - } - - /** * Copy one buffer's whole data to another. Write starts at the current position of 'out' buffer. * Note : This will advance the position marker of {@code out} but not change the position maker * for {@code in}. The position and limit of the {@code in} buffer to be set properly by caller. @@ -377,22 +370,51 @@ public final class ByteBufferUtils { /** * Copy from one buffer to another from given offset. This will be absolute positional copying and * won't affect the position of any of the buffers. - * @param out * @param in + * @param out * @param sourceOffset * @param destinationOffset * @param length */ - public static void copyFromBufferToBuffer(ByteBuffer out, ByteBuffer in, int sourceOffset, + public static int copyFromBufferToBuffer(ByteBuffer in, ByteBuffer out, int sourceOffset, int destinationOffset, int length) { if (in.hasArray() && out.hasArray()) { System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.arrayOffset() + destinationOffset, length); + } else if (UnsafeAccess.isAvailable()) { + UnsafeAccess.copy(in, sourceOffset, out, destinationOffset, length); + } else { + for (int i = 0; i < length; ++i) { + putByte(out, destinationOffset + i, toByte(in, sourceOffset + i)); + } + } + return destinationOffset + length; + } + + /** + * Copy from one buffer to another from given offset. + * <p> + * Note : This will advance the position marker of {@code out} but not change the position maker + * for {@code in} + * @param in source buffer + * @param out destination buffer + * @param sourceOffset offset in the source buffer + * @param length how many bytes to copy + */ + public static void copyFromBufferToBuffer(ByteBuffer in, + ByteBuffer out, int sourceOffset, int length) { + if (in.hasArray() && out.hasArray()) { + System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out.array(), out.position() + + out.arrayOffset(), length); + } else if (UnsafeAccess.isAvailable()) { + UnsafeAccess.copy(in, sourceOffset, out, out.position(), length); } else { + int destOffset = out.position(); for (int i = 0; i < length; ++i) { - out.put((destinationOffset + i), in.get(sourceOffset + i)); + putByte(out, destOffset + i, toByte(in, sourceOffset + i)); } } + skip(out, length); } /** @@ -736,6 +758,35 @@ public final class ByteBufferUtils { } /** + * Put a short value out to the given ByteBuffer's current position in big-endian format. + * This also advances the position in buffer by short size. + * @param buffer the ByteBuffer to write to + * @param val short to write out + */ + public static void putShort(ByteBuffer buffer, short val) { + if (UnsafeAccess.isAvailable()) { + int newPos = UnsafeAccess.putShort(buffer, buffer.position(), val); + buffer.position(newPos); + } else { + buffer.putShort(val); + } + } + + /** + * Put a long value out to the given ByteBuffer's current position in big-endian format. + * This also advances the position in buffer by long size. + * @param buffer the ByteBuffer to write to + * @param val long to write out + */ + public static void putLong(ByteBuffer buffer, long val) { + if (UnsafeAccess.isAvailable()) { + int newPos = UnsafeAccess.putLong(buffer, buffer.position(), val); + buffer.position(newPos); + } else { + buffer.putLong(val); + } + } + /** * Copies the bytes from given array's offset to length part into the given buffer. Puts the bytes * to buffer's current position. This also advances the position in the 'out' buffer by 'length' * @param out @@ -758,15 +809,16 @@ public final class ByteBufferUtils { } /** - * Copies specified number of bytes from given offset of 'in' ByteBuffer to the array. + * Copies specified number of bytes from given offset of 'in' ByteBuffer to + * the array. * @param out * @param in * @param sourceOffset * @param destinationOffset * @param length */ - public static void copyFromBufferToArray(byte[] out, ByteBuffer in, - int sourceOffset, int destinationOffset, int length) { + public static void copyFromBufferToArray(byte[] out, ByteBuffer in, int sourceOffset, + int destinationOffset, int length) { if (in.hasArray()) { System.arraycopy(in.array(), sourceOffset + in.arrayOffset(), out, destinationOffset, length); } else if (UnsafeAccess.isAvailable()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java index 34d9f90..aa0795d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Hash.java @@ -140,4 +140,6 @@ public abstract class Hash { * @return hash value */ public abstract int hash(byte[] bytes, int offset, int length, int initval); + + // TODO : a buffer based hash function would be needed.. Not adding it for now } http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java index ea13dbc..0cccee6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java @@ -343,4 +343,72 @@ public final class UnsafeAccess { } theUnsafe.copyMemory(srcBase, srcAddress, destBase, destAddress, length); } + // APIs to add primitives to BBs + /** + * Put a short value out to the specified BB position in big-endian format. + * @param buf the byte buffer + * @param offset position in the buffer + * @param val short to write out + * @return incremented offset + */ + public static int putShort(ByteBuffer buf, int offset, short val) { + if (littleEndian) { + val = Short.reverseBytes(val); + } + if (buf.isDirect()) { + theUnsafe.putShort(((DirectBuffer) buf).address() + offset, val); + } else { + theUnsafe.putShort(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val); + } + return offset + Bytes.SIZEOF_SHORT; + } + + /** + * Put a long value out to the specified BB position in big-endian format. + * @param buf the byte buffer + * @param offset position in the buffer + * @param val long to write out + * @return incremented offset + */ + public static int putLong(ByteBuffer buf, int offset, long val) { + if (littleEndian) { + val = Long.reverseBytes(val); + } + if (buf.isDirect()) { + theUnsafe.putLong(((DirectBuffer) buf).address() + offset, val); + } else { + theUnsafe.putLong(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, val); + } + return offset + Bytes.SIZEOF_LONG; + } + /** + * Put a byte value out to the specified BB position in big-endian format. + * @param buf the byte buffer + * @param offset position in the buffer + * @param b byte to write out + * @return incremented offset + */ + public static int putByte(ByteBuffer buf, int offset, byte b) { + if (buf.isDirect()) { + theUnsafe.putByte(((DirectBuffer) buf).address() + offset, b); + } else { + theUnsafe.putByte(buf.array(), + BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset, b); + } + return offset + 1; + } + + /** + * Returns the byte at the given offset + * @param buf the buffer to read + * @param offset the offset at which the byte has to be read + * @return the byte at the given offset + */ + public static byte toByte(ByteBuffer buf, int offset) { + if (buf.isDirect()) { + return theUnsafe.getByte(((DirectBuffer) buf).address() + offset); + } else { + return theUnsafe.getByte(buf.array(), BYTE_ARRAY_BASE_OFFSET + buf.arrayOffset() + offset); + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java deleted file mode 100644 index 30fb71e..0000000 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestByteBufferInputStream.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import static org.junit.Assert.assertEquals; - -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.nio.ByteBuffer; - -import org.apache.hadoop.hbase.testclassification.IOTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ IOTests.class, SmallTests.class }) -public class TestByteBufferInputStream { - - @Test - public void testReads() throws Exception { - ByteArrayOutputStream bos = new ByteArrayOutputStream(100); - DataOutputStream dos = new DataOutputStream(bos); - String s = "test"; - int i = 128; - dos.write(1); - dos.writeInt(i); - dos.writeBytes(s); - dos.writeLong(12345L); - dos.writeShort(2); - dos.flush(); - ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray()); - - // bbis contains 19 bytes - // 1 byte, 4 bytes int, 4 bytes string, 8 bytes long and 2 bytes short - ByteBufferInputStream bbis = new ByteBufferInputStream(bb); - assertEquals(15 + s.length(), bbis.available()); - assertEquals(1, bbis.read()); - byte[] ib = new byte[4]; - bbis.read(ib); - assertEquals(i, Bytes.toInt(ib)); - byte[] sb = new byte[s.length()]; - bbis.read(sb); - assertEquals(s, Bytes.toString(sb)); - byte[] lb = new byte[8]; - bbis.read(lb); - assertEquals(12345, Bytes.toLong(lb)); - assertEquals(2, bbis.available()); - ib = new byte[4]; - int read = bbis.read(ib, 0, ib.length); - // We dont have 4 bytes remainig but only 2. So onlt those should be returned back - assertEquals(2, read); - assertEquals(2, Bytes.toShort(ib)); - assertEquals(0, bbis.available()); - // At end. The read() should return -1 - assertEquals(-1, bbis.read()); - bbis.close(); - - bb = ByteBuffer.wrap(bos.toByteArray()); - bbis = new ByteBufferInputStream(bb); - DataInputStream dis = new DataInputStream(bbis); - dis.read(); - assertEquals(i, dis.readInt()); - dis.close(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestMultiByteBuffInputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestMultiByteBuffInputStream.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestMultiByteBuffInputStream.java new file mode 100644 index 0000000..ed96e87 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/io/TestMultiByteBuffInputStream.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ IOTests.class, SmallTests.class }) +public class TestMultiByteBuffInputStream { + + @Test + public void testReads() throws Exception { + ByteArrayOutputStream bos = new ByteArrayOutputStream(100); + DataOutputStream dos = new DataOutputStream(bos); + String s = "test"; + int i = 128; + dos.write(1); + dos.writeInt(i); + dos.writeBytes(s); + dos.writeLong(12345L); + dos.writeShort(2); + dos.flush(); + ByteBuffer bb = ByteBuffer.wrap(bos.toByteArray()); + + // bbis contains 19 bytes + // 1 byte, 4 bytes int, 4 bytes string, 8 bytes long and 2 bytes short + ByteBuffInputStream bbis = new ByteBuffInputStream(new MultiByteBuff(bb)); + assertEquals(15 + s.length(), bbis.available()); + assertEquals(1, bbis.read()); + byte[] ib = new byte[4]; + bbis.read(ib); + assertEquals(i, Bytes.toInt(ib)); + byte[] sb = new byte[s.length()]; + bbis.read(sb); + assertEquals(s, Bytes.toString(sb)); + byte[] lb = new byte[8]; + bbis.read(lb); + assertEquals(12345, Bytes.toLong(lb)); + assertEquals(2, bbis.available()); + ib = new byte[4]; + int read = bbis.read(ib, 0, ib.length); + // We dont have 4 bytes remainig but only 2. So onlt those should be returned back + assertEquals(2, read); + assertEquals(2, Bytes.toShort(ib)); + assertEquals(0, bbis.available()); + // At end. The read() should return -1 + assertEquals(-1, bbis.read()); + bbis.close(); + + bb = ByteBuffer.wrap(bos.toByteArray()); + bbis = new ByteBuffInputStream(new MultiByteBuff(bb)); + DataInputStream dis = new DataInputStream(bbis); + dis.read(); + assertEquals(i, dis.readInt()); + dis.close(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java new file mode 100644 index 0000000..588e946 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuff.java @@ -0,0 +1,324 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.nio; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MiscTests.class, SmallTests.class }) +public class TestMultiByteBuff { + + @Test + public void testWritesAndReads() { + // Absolute reads + ByteBuffer bb1 = ByteBuffer.allocate(15); + ByteBuffer bb2 = ByteBuffer.allocate(15); + int i1 = 4; + bb1.putInt(i1); + long l1 = 45L, l2 = 100L, l3 = 12345L; + bb1.putLong(l1); + short s1 = 2; + bb1.putShort(s1); + byte[] b = Bytes.toBytes(l2); + bb1.put(b, 0, 1); + bb2.put(b, 1, 7); + bb2.putLong(l3); + MultiByteBuff mbb = new MultiByteBuff(bb1, bb2); + assertEquals(l1, mbb.getLong(4)); + assertEquals(l2, mbb.getLong(14)); + assertEquals(l3, mbb.getLong(22)); + assertEquals(i1, mbb.getInt(0)); + assertEquals(s1, mbb.getShort(12)); + // Relative reads + assertEquals(i1, mbb.getInt()); + assertEquals(l1, mbb.getLong()); + assertEquals(s1, mbb.getShort()); + assertEquals(l2, mbb.getLong()); + assertEquals(l3, mbb.getLong()); + // Absolute writes + bb1 = ByteBuffer.allocate(15); + bb2 = ByteBuffer.allocate(15); + mbb = new MultiByteBuff(bb1, bb2); + byte b1 = 5, b2 = 31; + mbb.put(b1); + mbb.putLong(l1); + mbb.putInt(i1); + mbb.putLong(l2); + mbb.put(b2); + mbb.position(mbb.position() + 2); + try { + mbb.putLong(l3); + fail("'Should have thrown BufferOverflowException"); + } catch (BufferOverflowException e) { + } + mbb.position(mbb.position() - 2); + mbb.putLong(l3); + mbb.rewind(); + assertEquals(b1, mbb.get()); + assertEquals(l1, mbb.getLong()); + assertEquals(i1, mbb.getInt()); + assertEquals(l2, mbb.getLong()); + assertEquals(b2, mbb.get()); + assertEquals(l3, mbb.getLong()); + mbb.put(21, b1); + mbb.position(21); + assertEquals(b1, mbb.get()); + mbb.put(b); + assertEquals(l2, mbb.getLong(22)); + try { + // This should fail because we have already move to a position + // greater than 22 + mbb.getLongStrictlyForward(22); + fail(); + } catch (IndexOutOfBoundsException e) { + } + } + + @Test + public void testPutPrimitives() { + ByteBuffer bb = ByteBuffer.allocate(10); + SingleByteBuff s = new SingleByteBuff(bb); + s.putLong(-4465109508325701663l); + bb.rewind(); + long long1 = bb.getLong(); + assertEquals(long1, -4465109508325701663l); + s.position(8); + } + + @Test + public void testArrayBasedMethods() { + byte[] b = new byte[15]; + ByteBuffer bb1 = ByteBuffer.wrap(b, 1, 10).slice(); + ByteBuffer bb2 = ByteBuffer.allocate(15); + ByteBuff mbb1 = new MultiByteBuff(bb1, bb2); + assertFalse(mbb1.hasArray()); + try { + mbb1.array(); + fail(); + } catch (UnsupportedOperationException e) { + } + try { + mbb1.arrayOffset(); + fail(); + } catch (UnsupportedOperationException e) { + } + mbb1 = new SingleByteBuff(bb1); + assertTrue(mbb1.hasArray()); + assertEquals(1, mbb1.arrayOffset()); + assertEquals(b, mbb1.array()); + mbb1 = new SingleByteBuff(ByteBuffer.allocateDirect(10)); + assertFalse(mbb1.hasArray()); + try { + mbb1.array(); + fail(); + } catch (UnsupportedOperationException e) { + } + try { + mbb1.arrayOffset(); + fail(); + } catch (UnsupportedOperationException e) { + } + } + + @Test + public void testMarkAndResetWithMBB() { + ByteBuffer bb1 = ByteBuffer.allocateDirect(15); + ByteBuffer bb2 = ByteBuffer.allocateDirect(15); + bb1.putInt(4); + long l1 = 45L, l2 = 100L, l3 = 12345L; + bb1.putLong(l1); + bb1.putShort((short) 2); + byte[] b = Bytes.toBytes(l2); + bb1.put(b, 0, 1); + bb2.put(b, 1, 7); + bb2.putLong(l3); + ByteBuff multi = new MultiByteBuff(bb1, bb2); + assertEquals(4, multi.getInt()); + assertEquals(l1, multi.getLong()); + multi.mark(); + assertEquals((short) 2, multi.getShort()); + multi.reset(); + assertEquals((short) 2, multi.getShort()); + multi.mark(); + assertEquals(l2, multi.getLong()); + multi.reset(); + assertEquals(l2, multi.getLong()); + multi.mark(); + assertEquals(l3, multi.getLong()); + multi.reset(); + assertEquals(l3, multi.getLong()); + // Try absolute gets with mark and reset + multi.mark(); + assertEquals(l2, multi.getLong(14)); + multi.reset(); + assertEquals(l3, multi.getLong(22)); + // Just reset to see what happens + multi.reset(); + assertEquals(l2, multi.getLong(14)); + multi.mark(); + assertEquals(l3, multi.getLong(22)); + multi.reset(); + } + + @Test + public void testSkipNBytes() { + ByteBuffer bb1 = ByteBuffer.allocate(15); + ByteBuffer bb2 = ByteBuffer.allocate(15); + bb1.putInt(4); + long l1 = 45L, l2 = 100L, l3 = 12345L; + bb1.putLong(l1); + bb1.putShort((short) 2); + byte[] b = Bytes.toBytes(l2); + bb1.put(b, 0, 1); + bb2.put(b, 1, 7); + bb2.putLong(l3); + MultiByteBuff multi = new MultiByteBuff(bb1, bb2); + assertEquals(4, multi.getInt()); + assertEquals(l1, multi.getLong()); + multi.skip(10); + assertEquals(l3, multi.getLong()); + } + + @Test + public void testMoveBack() { + ByteBuffer bb1 = ByteBuffer.allocate(15); + ByteBuffer bb2 = ByteBuffer.allocate(15); + bb1.putInt(4); + long l1 = 45L, l2 = 100L, l3 = 12345L; + bb1.putLong(l1); + bb1.putShort((short) 2); + byte[] b = Bytes.toBytes(l2); + bb1.put(b, 0, 1); + bb2.put(b, 1, 7); + bb2.putLong(l3); + MultiByteBuff multi = new MultiByteBuff(bb1, bb2); + assertEquals(4, multi.getInt()); + assertEquals(l1, multi.getLong()); + multi.skip(10); + multi.moveBack(4); + multi.moveBack(6); + multi.moveBack(8); + assertEquals(l1, multi.getLong()); + } + + @Test + public void testSubBuffer() { + ByteBuffer bb1 = ByteBuffer.allocateDirect(10); + ByteBuffer bb2 = ByteBuffer.allocateDirect(10); + MultiByteBuff multi = new MultiByteBuff(bb1, bb2); + long l1 = 1234L, l2 = 100L; + multi.putLong(l1); + multi.putLong(l2); + multi.rewind(); + ByteBuffer sub = multi.asSubByteBuffer(Bytes.SIZEOF_LONG); + assertTrue(bb1 == sub); + assertEquals(l1, ByteBufferUtils.toLong(sub, sub.position())); + multi.skip(Bytes.SIZEOF_LONG); + sub = multi.asSubByteBuffer(Bytes.SIZEOF_LONG); + assertFalse(bb1 == sub); + assertFalse(bb2 == sub); + assertEquals(l2, ByteBufferUtils.toLong(sub, sub.position())); + multi.rewind(); + Pair<ByteBuffer, Integer> p = new Pair<ByteBuffer, Integer>(); + multi.asSubByteBuffer(8, Bytes.SIZEOF_LONG, p); + assertFalse(bb1 == p.getFirst()); + assertFalse(bb2 == p.getFirst()); + assertEquals(0, p.getSecond().intValue()); + assertEquals(l2, ByteBufferUtils.toLong(sub, p.getSecond())); + } + + @Test + public void testSliceDuplicateMethods() throws Exception { + ByteBuffer bb1 = ByteBuffer.allocateDirect(10); + ByteBuffer bb2 = ByteBuffer.allocateDirect(15); + MultiByteBuff multi = new MultiByteBuff(bb1, bb2); + long l1 = 1234L, l2 = 100L; + multi.put((byte) 2); + multi.putLong(l1); + multi.putLong(l2); + multi.putInt(45); + multi.position(1); + multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG)); + MultiByteBuff sliced = multi.slice(); + assertEquals(0, sliced.position()); + assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit()); + assertEquals(l1, sliced.getLong()); + assertEquals(l2, sliced.getLong()); + MultiByteBuff dup = multi.duplicate(); + assertEquals(1, dup.position()); + assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit()); + assertEquals(l1, dup.getLong()); + assertEquals(l2, dup.getLong()); + } + + @Test + public void testGetWithPosOnMultiBuffers() throws IOException { + byte[] b = new byte[4]; + byte[] b1 = new byte[4]; + ByteBuffer bb1 = ByteBuffer.wrap(b); + ByteBuffer bb2 = ByteBuffer.wrap(b1); + MultiByteBuff mbb1 = new MultiByteBuff(bb1, bb2); + mbb1.position(2); + mbb1.putInt(4); + int res = mbb1.getInt(2); + byte[] bres = new byte[4]; + bres[0] = mbb1.get(2); + bres[1] = mbb1.get(3); + bres[2] = mbb1.get(4); + bres[3] = mbb1.get(5); + int expected = Bytes.toInt(bres); + assertEquals(res, expected); + } + + @Test + public void testGetIntStrictlyForwardWithPosOnMultiBuffers() throws IOException { + byte[] b = new byte[4]; + byte[] b1 = new byte[8]; + ByteBuffer bb1 = ByteBuffer.wrap(b); + ByteBuffer bb2 = ByteBuffer.wrap(b1); + MultiByteBuff mbb1 = new MultiByteBuff(bb1, bb2); + mbb1.position(2); + mbb1.putInt(4); + mbb1.position(7); + mbb1.put((byte) 2); + mbb1.putInt(3); + mbb1.position(0); + mbb1.getIntStrictlyForward(4); + byte res = mbb1.get(7); + assertEquals((byte) 2, res); + mbb1.position(7); + int intRes = mbb1.getIntStrictlyForward(8); + assertEquals(3, intRes); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java deleted file mode 100644 index ddab391..0000000 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/nio/TestMultiByteBuffer.java +++ /dev/null @@ -1,316 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.nio; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.BufferOverflowException; -import java.nio.ByteBuffer; - -import org.apache.hadoop.hbase.testclassification.MiscTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.ByteBufferUtils; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.WritableUtils; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ MiscTests.class, SmallTests.class }) -public class TestMultiByteBuffer { - - @Test - public void testWritesAndReads() { - // Absolute reads - ByteBuffer bb1 = ByteBuffer.allocate(15); - ByteBuffer bb2 = ByteBuffer.allocate(15); - int i1 = 4; - bb1.putInt(i1); - long l1 = 45L, l2 = 100L, l3 = 12345L; - bb1.putLong(l1); - short s1 = 2; - bb1.putShort(s1); - byte[] b = Bytes.toBytes(l2); - bb1.put(b, 0, 1); - bb2.put(b, 1, 7); - bb2.putLong(l3); - MultiByteBuffer mbb = new MultiByteBuffer(bb1, bb2); - assertEquals(l1, mbb.getLong(4)); - assertEquals(l2, mbb.getLong(14)); - assertEquals(l3, mbb.getLong(22)); - assertEquals(i1, mbb.getInt(0)); - assertEquals(s1, mbb.getShort(12)); - // Relative reads - assertEquals(i1, mbb.getInt()); - assertEquals(l1, mbb.getLong()); - assertEquals(s1, mbb.getShort()); - assertEquals(l2, mbb.getLong()); - assertEquals(l3, mbb.getLong()); - // Absolute writes - bb1 = ByteBuffer.allocate(15); - bb2 = ByteBuffer.allocate(15); - mbb = new MultiByteBuffer(bb1, bb2); - byte b1 = 5, b2 = 31; - mbb.put(b1); - mbb.putLong(l1); - mbb.putInt(i1); - mbb.putLong(l2); - mbb.put(b2); - mbb.position(mbb.position() + 2); - try { - mbb.putLong(l3); - fail("'Should have thrown BufferOverflowException"); - } catch (BufferOverflowException e) { - } - mbb.position(mbb.position() - 2); - mbb.putLong(l3); - mbb.rewind(); - assertEquals(b1, mbb.get()); - assertEquals(l1, mbb.getLong()); - assertEquals(i1, mbb.getInt()); - assertEquals(l2, mbb.getLong()); - assertEquals(b2, mbb.get()); - assertEquals(l3, mbb.getLong()); - mbb.put(21, b1); - mbb.position(21); - assertEquals(b1, mbb.get()); - mbb.put(b); - assertEquals(l2, mbb.getLong(22)); - } - - @Test - public void testGetVlong() throws IOException { - long vlong = 453478; - ByteArrayOutputStream baos = new ByteArrayOutputStream(10); - DataOutput out = new DataOutputStream(baos); - WritableUtils.writeVLong(out, vlong); - ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); - MultiByteBuffer mbb = new MultiByteBuffer(bb); - assertEquals(vlong, mbb.getVLong()); - } - - @Test - public void testArrayBasedMethods() { - byte[] b = new byte[15]; - ByteBuffer bb1 = ByteBuffer.wrap(b, 1, 10).slice(); - ByteBuffer bb2 = ByteBuffer.allocate(15); - MultiByteBuffer mbb1 = new MultiByteBuffer(bb1, bb2); - assertFalse(mbb1.hasArray()); - try { - mbb1.array(); - fail(); - } catch (UnsupportedOperationException e) { - } - try { - mbb1.arrayOffset(); - fail(); - } catch (UnsupportedOperationException e) { - } - mbb1 = new MultiByteBuffer(bb1); - assertTrue(mbb1.hasArray()); - assertEquals(1, mbb1.arrayOffset()); - assertEquals(b, mbb1.array()); - mbb1 = new MultiByteBuffer(ByteBuffer.allocateDirect(10)); - assertFalse(mbb1.hasArray()); - try { - mbb1.array(); - fail(); - } catch (UnsupportedOperationException e) { - } - try { - mbb1.arrayOffset(); - fail(); - } catch (UnsupportedOperationException e) { - } - } - - @Test - public void testMarkAndReset() { - ByteBuffer bb1 = ByteBuffer.allocateDirect(15); - ByteBuffer bb2 = ByteBuffer.allocateDirect(15); - bb1.putInt(4); - long l1 = 45L, l2 = 100L, l3 = 12345L; - bb1.putLong(l1); - bb1.putShort((short) 2); - byte[] b = Bytes.toBytes(l2); - bb1.put(b, 0, 1); - bb2.put(b, 1, 7); - bb2.putLong(l3); - MultiByteBuffer multi = new MultiByteBuffer(bb1, bb2); - assertEquals(4, multi.getInt()); - assertEquals(l1, multi.getLong()); - multi.mark(); - assertEquals((short) 2, multi.getShort()); - multi.reset(); - assertEquals((short) 2, multi.getShort()); - multi.mark(); - assertEquals(l2, multi.getLong()); - multi.reset(); - assertEquals(l2, multi.getLong()); - multi.mark(); - assertEquals(l3, multi.getLong()); - multi.reset(); - assertEquals(l3, multi.getLong()); - // Try absolute gets with mark and reset - multi.mark(); - assertEquals(l2, multi.getLong(14)); - multi.reset(); - assertEquals(l3, multi.getLong(22)); - // Just reset to see what happens - multi.reset(); - assertEquals(l2, multi.getLong(14)); - multi.mark(); - assertEquals(l3, multi.getLong(22)); - multi.reset(); - } - - @Test - public void testSkipNBytes() { - ByteBuffer bb1 = ByteBuffer.allocate(15); - ByteBuffer bb2 = ByteBuffer.allocate(15); - bb1.putInt(4); - long l1 = 45L, l2 = 100L, l3 = 12345L; - bb1.putLong(l1); - bb1.putShort((short) 2); - byte[] b = Bytes.toBytes(l2); - bb1.put(b, 0, 1); - bb2.put(b, 1, 7); - bb2.putLong(l3); - MultiByteBuffer multi = new MultiByteBuffer(bb1, bb2); - assertEquals(4, multi.getInt()); - assertEquals(l1, multi.getLong()); - multi.skip(10); - assertEquals(l3, multi.getLong()); - } - - @Test - public void testMoveBack() { - ByteBuffer bb1 = ByteBuffer.allocate(15); - ByteBuffer bb2 = ByteBuffer.allocate(15); - bb1.putInt(4); - long l1 = 45L, l2 = 100L, l3 = 12345L; - bb1.putLong(l1); - bb1.putShort((short) 2); - byte[] b = Bytes.toBytes(l2); - bb1.put(b, 0, 1); - bb2.put(b, 1, 7); - bb2.putLong(l3); - MultiByteBuffer multi = new MultiByteBuffer(bb1, bb2); - assertEquals(4, multi.getInt()); - assertEquals(l1, multi.getLong()); - multi.skip(10); - multi.moveBack(4); - multi.moveBack(6); - multi.moveBack(8); - assertEquals(l1, multi.getLong()); - } - - @Test - public void testSubBuffer() { - ByteBuffer bb1 = ByteBuffer.allocateDirect(10); - ByteBuffer bb2 = ByteBuffer.allocateDirect(10); - MultiByteBuffer multi = new MultiByteBuffer(bb1, bb2); - long l1 = 1234L, l2 = 100L; - multi.putLong(l1); - multi.putLong(l2); - multi.rewind(); - ByteBuffer sub = multi.asSubBuffer(Bytes.SIZEOF_LONG); - assertTrue(bb1 == sub); - assertEquals(l1, ByteBufferUtils.toLong(sub, sub.position())); - multi.skip(Bytes.SIZEOF_LONG); - sub = multi.asSubBuffer(Bytes.SIZEOF_LONG); - assertFalse(bb1 == sub); - assertFalse(bb2 == sub); - assertEquals(l2, ByteBufferUtils.toLong(sub, sub.position())); - multi.rewind(); - Pair<ByteBuffer, Integer> p = multi.asSubBuffer(8, Bytes.SIZEOF_LONG); - assertFalse(bb1 == p.getFirst()); - assertFalse(bb2 == p.getFirst()); - assertEquals(0, p.getSecond().intValue()); - assertEquals(l2, ByteBufferUtils.toLong(sub, p.getSecond())); - } - - @Test - public void testSliceDuplicateMethods() throws Exception { - ByteBuffer bb1 = ByteBuffer.allocateDirect(10); - ByteBuffer bb2 = ByteBuffer.allocateDirect(15); - MultiByteBuffer multi = new MultiByteBuffer(bb1, bb2); - long l1 = 1234L, l2 = 100L; - multi.put((byte) 2); - multi.putLong(l1); - multi.putLong(l2); - multi.putInt(45); - multi.position(1); - multi.limit(multi.position() + (2 * Bytes.SIZEOF_LONG)); - MultiByteBuffer sliced = multi.slice(); - assertEquals(0, sliced.position()); - assertEquals((2 * Bytes.SIZEOF_LONG), sliced.limit()); - assertEquals(l1, sliced.getLong()); - assertEquals(l2, sliced.getLong()); - MultiByteBuffer dup = multi.duplicate(); - assertEquals(1, dup.position()); - assertEquals(dup.position() + (2 * Bytes.SIZEOF_LONG), dup.limit()); - assertEquals(l1, dup.getLong()); - assertEquals(l2, dup.getLong()); - } - - @Test - public void testGetWithPosOnMultiBuffers() throws IOException { - byte[] b = new byte[4]; - byte[] b1 = new byte[4]; - ByteBuffer bb1 = ByteBuffer.wrap(b); - ByteBuffer bb2 = ByteBuffer.wrap(b1); - MultiByteBuffer mbb1 = new MultiByteBuffer(bb1, bb2); - mbb1.position(2); - mbb1.putInt(4); - int res = mbb1.getInt(2); - byte[] bres = new byte[4]; - bres[0] = mbb1.get(2); - bres[1] = mbb1.get(3); - bres[2] = mbb1.get(4); - bres[3] = mbb1.get(5); - int expected = Bytes.toInt(bres); - assertEquals(res, expected); - } - - @Test - public void testGetIntStrictlyForwardWithPosOnMultiBuffers() throws IOException { - byte[] b = new byte[4]; - byte[] b1 = new byte[4]; - ByteBuffer bb1 = ByteBuffer.wrap(b); - ByteBuffer bb2 = ByteBuffer.wrap(b1); - MultiByteBuffer mbb1 = new MultiByteBuffer(bb1, bb2); - mbb1.position(2); - mbb1.putInt(4); - mbb1.position(7); - mbb1.put((byte) 2); - mbb1.position(0); - mbb1.getIntStrictlyForward(4); - byte res = mbb1.get(7); - assertEquals((byte) 2, res); - } -}
