HBASE-12213 HFileBlock backed by Array of ByteBuffers (Ram)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/834f87b2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/834f87b2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/834f87b2 Branch: refs/heads/master Commit: 834f87b23de533783ba5f5b858327a6164f17f55 Parents: a249989 Author: ramkrishna <[email protected]> Authored: Fri Jul 17 13:27:29 2015 +0530 Committer: ramkrishna <[email protected]> Committed: Fri Jul 17 13:27:29 2015 +0530 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hbase/KeyValue.java | 10 +- .../hadoop/hbase/io/ByteBuffInputStream.java | 100 ++ .../hadoop/hbase/io/ByteBufferInputStream.java | 101 -- .../io/encoding/BufferedDataBlockEncoder.java | 4 +- .../io/encoding/CopyKeyDataBlockEncoder.java | 12 +- .../hbase/io/encoding/DataBlockEncoder.java | 3 +- .../hbase/io/encoding/DiffKeyDeltaEncoder.java | 15 +- .../hbase/io/encoding/FastDiffDeltaEncoder.java | 18 +- .../io/encoding/HFileBlockDecodingContext.java | 23 +- .../HFileBlockDefaultDecodingContext.java | 8 +- .../io/encoding/PrefixKeyDeltaEncoder.java | 18 +- .../apache/hadoop/hbase/io/hfile/BlockType.java | 5 +- .../org/apache/hadoop/hbase/nio/ByteBuff.java | 438 +++++++ .../apache/hadoop/hbase/nio/MultiByteBuff.java | 1100 ++++++++++++++++++ .../hadoop/hbase/nio/MultiByteBuffer.java | 1047 ----------------- .../apache/hadoop/hbase/nio/SingleByteBuff.java | 312 +++++ .../hadoop/hbase/util/ByteBufferArray.java | 64 + .../hadoop/hbase/util/ByteBufferUtils.java | 118 +- .../java/org/apache/hadoop/hbase/util/Hash.java | 2 + .../apache/hadoop/hbase/util/UnsafeAccess.java | 68 ++ .../hbase/io/TestByteBufferInputStream.java | 82 -- .../hbase/io/TestMultiByteBuffInputStream.java | 83 ++ .../hadoop/hbase/nio/TestMultiByteBuff.java | 324 ++++++ .../hadoop/hbase/nio/TestMultiByteBuffer.java | 316 ----- .../hbase/codec/prefixtree/PrefixTreeCodec.java | 10 +- .../codec/prefixtree/PrefixTreeSeeker.java | 1 + .../hbase/io/hfile/CacheableDeserializer.java | 6 +- .../hbase/io/hfile/CompoundBloomFilter.java | 8 +- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 4 +- .../hadoop/hbase/io/hfile/HFileBlock.java | 106 +- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 41 +- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 122 +- .../hbase/io/hfile/MemcachedBlockCache.java | 5 +- .../hbase/io/hfile/bucket/BucketCache.java | 11 +- .../io/hfile/bucket/ByteBufferIOEngine.java | 14 + .../hbase/io/hfile/bucket/FileIOEngine.java | 18 + .../hadoop/hbase/io/hfile/bucket/IOEngine.java | 21 + .../hbase/mapreduce/LoadIncrementalHFiles.java | 2 - .../hadoop/hbase/regionserver/StoreFile.java | 4 +- .../hadoop/hbase/regionserver/StoreScanner.java | 1 - .../apache/hadoop/hbase/util/BloomFilter.java | 6 +- .../hadoop/hbase/util/BloomFilterChunk.java | 31 - .../hadoop/hbase/util/BloomFilterUtil.java | 16 +- .../hbase/util/hbck/TableLockChecker.java | 1 - .../hadoop/hbase/client/TestFromClientSide.java | 2 + .../io/encoding/TestDataBlockEncoders.java | 4 +- .../hadoop/hbase/io/hfile/CacheTestUtils.java | 13 +- .../hadoop/hbase/io/hfile/TestCacheConfig.java | 6 +- .../hadoop/hbase/io/hfile/TestChecksum.java | 6 +- .../apache/hadoop/hbase/io/hfile/TestHFile.java | 11 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 33 +- .../io/hfile/TestHFileBlockCompatibility.java | 7 +- .../hbase/io/hfile/TestHFileBlockIndex.java | 12 +- .../hbase/io/hfile/TestHFileWriterV2.java | 6 +- .../hbase/io/hfile/TestHFileWriterV3.java | 6 +- .../io/hfile/bucket/TestByteBufferIOEngine.java | 45 + .../hadoop/hbase/util/TestBloomFilterChunk.java | 68 +- .../hadoop/hbase/util/TestByteBuffUtils.java | 78 ++ .../hadoop/hbase/util/TestByteBufferUtils.java | 2 +- 59 files changed, 3104 insertions(+), 1894 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 8c73984..368bf41 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -2613,6 +2613,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, * Hence create a Keyvalue(aka Cell) that would help in comparing as two cells */ public static class KeyOnlyKeyValue extends KeyValue { + private short rowLen = -1; public KeyOnlyKeyValue() { } @@ -2624,6 +2625,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, this.bytes = b; this.length = length; this.offset = offset; + this.rowLen = Bytes.toShort(this.bytes, this.offset); } @Override @@ -2642,6 +2644,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, this.bytes = key; this.offset = offset; this.length = length; + this.rowLen = Bytes.toShort(this.bytes, this.offset); } @Override @@ -2699,7 +2702,7 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, @Override public short getRowLength() { - return Bytes.toShort(this.bytes, getKeyOffset()); + return rowLen; } @Override @@ -2769,5 +2772,10 @@ public class KeyValue implements Cell, HeapSize, Cloneable, SettableSequenceId, public boolean equals(Object other) { return super.equals(other); } + + @Override + public long heapSize() { + return super.heapSize() + Bytes.SIZEOF_SHORT; + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java new file mode 100644 index 0000000..4f6b3c2 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffInputStream.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io; + +import java.io.InputStream; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; + +/** + * Not thread safe! + * <p> + * Please note that the reads will cause position movement on wrapped ByteBuff. + */ [email protected] +public class ByteBuffInputStream extends InputStream { + + private ByteBuff buf; + + public ByteBuffInputStream(ByteBuff buf) { + this.buf = buf; + } + + /** + * Reads the next byte of data from this input stream. The value byte is returned as an + * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available + * because the end of the stream has been reached, the value <code>-1</code> is returned. + * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached. + */ + public int read() { + if (this.buf.hasRemaining()) { + return (this.buf.get() & 0xff); + } + return -1; + } + + /** + * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from + * given offset). + * @param b the array into which the data is read. + * @param off the start offset in the destination array <code>b</code> + * @param len the maximum number of bytes to read. + * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even + * 1 byte can be read because the end of the stream has been reached. + */ + public int read (byte b[], int off, int len) { + int avail = available(); + if (avail <= 0) { + return -1; + } + if (len <= 0) { + return 0; + } + + if (len > avail) { + len = avail; + } + this.buf.get(b, off, len); + return len; + } + + /** + * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the + * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is + * equal to the smaller of <code>n</code> and remaining bytes in the stream. + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + */ + public long skip(long n) { + long k = Math.min(n, available()); + if (k <= 0) { + return 0; + } + this.buf.skip((int) k); + return k; + } + + /** + * @return the number of remaining bytes that can be read (or skipped + * over) from this input stream. + */ + public int available() { + return this.buf.remaining(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java deleted file mode 100644 index 1530ccd..0000000 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferInputStream.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.io; - -import java.io.InputStream; -import java.nio.ByteBuffer; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * Not thread safe! - * <p> - * Please note that the reads will cause position movement on wrapped ByteBuffer. - */ [email protected] -public class ByteBufferInputStream extends InputStream { - - private ByteBuffer buf; - - public ByteBufferInputStream(ByteBuffer buf) { - this.buf = buf; - } - - /** - * Reads the next byte of data from this input stream. The value byte is returned as an - * <code>int</code> in the range <code>0</code> to <code>255</code>. If no byte is available - * because the end of the stream has been reached, the value <code>-1</code> is returned. - * @return the next byte of data, or <code>-1</code> if the end of the stream has been reached. - */ - public int read() { - if (this.buf.hasRemaining()) { - return (this.buf.get() & 0xff); - } - return -1; - } - - /** - * Reads up to next <code>len</code> bytes of data from buffer into passed array(starting from - * given offset). - * @param b the array into which the data is read. - * @param off the start offset in the destination array <code>b</code> - * @param len the maximum number of bytes to read. - * @return the total number of bytes actually read into the buffer, or <code>-1</code> if not even - * 1 byte can be read because the end of the stream has been reached. - */ - public int read(byte b[], int off, int len) { - int avail = available(); - if (avail <= 0) { - return -1; - } - - if (len > avail) { - len = avail; - } - if (len <= 0) { - return 0; - } - - this.buf.get(b, off, len); - return len; - } - - /** - * Skips <code>n</code> bytes of input from this input stream. Fewer bytes might be skipped if the - * end of the input stream is reached. The actual number <code>k</code> of bytes to be skipped is - * equal to the smaller of <code>n</code> and remaining bytes in the stream. - * @param n the number of bytes to be skipped. - * @return the actual number of bytes skipped. - */ - public long skip(long n) { - long k = Math.min(n, available()); - if (k < 0) { - k = 0; - } - this.buf.position((int) (this.buf.position() + k)); - return k; - } - - /** - * @return the number of remaining bytes that can be read (or skipped - * over) from this input stream. - */ - public int available() { - return this.buf.remaining(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java index 03875dc..966c59b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/BufferedDataBlockEncoder.java @@ -601,7 +601,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { kvBuffer.putInt(current.keyLength); kvBuffer.putInt(current.valueLength); kvBuffer.put(current.keyBuffer, 0, current.keyLength); - ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.valueOffset, + ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.valueOffset, current.valueLength); if (current.tagsLength > 0) { // Put short as unsigned @@ -610,7 +610,7 @@ abstract class BufferedDataBlockEncoder implements DataBlockEncoder { if (current.tagsOffset != -1) { // the offset of the tags bytes in the underlying buffer is marked. So the temp // buffer,tagsBuffer was not been used. - ByteBufferUtils.copyFromBufferToBuffer(kvBuffer, currentBuffer, current.tagsOffset, + ByteBufferUtils.copyFromBufferToBuffer(currentBuffer, kvBuffer, current.tagsOffset, current.tagsLength); } else { // When tagsOffset is marked as -1, tag compression was present and so the tags were http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java index 4eea272..662be29 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/CopyKeyDataBlockEncoder.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; @@ -66,13 +67,12 @@ public class CopyKeyDataBlockEncoder extends BufferedDataBlockEncoder { } @Override - public Cell getFirstKeyCellInBlock(ByteBuffer block) { - int keyLength = block.getInt(Bytes.SIZEOF_INT); - ByteBuffer dup = block.duplicate(); + public Cell getFirstKeyCellInBlock(ByteBuff block) { + int keyLength = block.getIntStrictlyForward(Bytes.SIZEOF_INT); int pos = 3 * Bytes.SIZEOF_INT; - dup.position(pos); - dup.limit(pos + keyLength); - return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength); + ByteBuffer key = block.asSubByteBuffer(pos + keyLength).duplicate(); + // TODO : to be changed here for BBCell + return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + pos, keyLength); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java index b0467b8..ce71308 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DataBlockEncoder.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.nio.ByteBuff; /** * Encoding of KeyValue. It aims to be fast and efficient using assumptions: @@ -90,7 +91,7 @@ public interface DataBlockEncoder { * @param block encoded block we want index, the position will not change * @return First key in block as a cell. */ - Cell getFirstKeyCellInBlock(ByteBuffer block); + Cell getFirstKeyCellInBlock(ByteBuff block); /** * Create a HFileBlock seeker which find KeyValues within a block. http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java index f2d4751..90b8e6e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/DiffKeyDeltaEncoder.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -305,15 +306,16 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { } @Override - public Cell getFirstKeyCellInBlock(ByteBuffer block) { + public Cell getFirstKeyCellInBlock(ByteBuff block) { block.mark(); block.position(Bytes.SIZEOF_INT); byte familyLength = block.get(); - ByteBufferUtils.skip(block, familyLength); + block.skip(familyLength); byte flag = block.get(); - int keyLength = ByteBufferUtils.readCompressedInt(block); - ByteBufferUtils.readCompressedInt(block); // valueLength - ByteBufferUtils.readCompressedInt(block); // commonLength + int keyLength = ByteBuff.readCompressedInt(block); + // TODO : See if we can avoid these reads as the read values are not getting used + ByteBuff.readCompressedInt(block); // valueLength + ByteBuff.readCompressedInt(block); // commonLength ByteBuffer result = ByteBuffer.allocate(keyLength); // copy row @@ -341,7 +343,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { // copy the timestamp and type int timestampFitInBytes = ((flag & MASK_TIMESTAMP_LENGTH) >>> SHIFT_TIMESTAMP_LENGTH) + 1; - long timestamp = ByteBufferUtils.readLong(block, timestampFitInBytes); + long timestamp = ByteBuff.readLong(block, timestampFitInBytes); if ((flag & FLAG_TIMESTAMP_SIGN) != 0) { timestamp = -timestamp; } @@ -350,6 +352,7 @@ public class DiffKeyDeltaEncoder extends BufferedDataBlockEncoder { block.get(result.array(), pos, Bytes.SIZEOF_BYTE); block.reset(); + // The result is already a BB. So always we will create a KeyOnlyKv. return new KeyValue.KeyOnlyKeyValue(result.array(), 0, keyLength); } http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java index f750e09..fa4adbd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/FastDiffDeltaEncoder.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -354,18 +355,17 @@ public class FastDiffDeltaEncoder extends BufferedDataBlockEncoder { } @Override - public Cell getFirstKeyCellInBlock(ByteBuffer block) { + public Cell getFirstKeyCellInBlock(ByteBuff block) { block.mark(); block.position(Bytes.SIZEOF_INT + Bytes.SIZEOF_BYTE); - int keyLength = ByteBufferUtils.readCompressedInt(block); - ByteBufferUtils.readCompressedInt(block); // valueLength - ByteBufferUtils.readCompressedInt(block); // commonLength - int pos = block.position(); + int keyLength = ByteBuff.readCompressedInt(block); + // TODO : See if we can avoid these reads as the read values are not getting used + ByteBuff.readCompressedInt(block); // valueLength + ByteBuff.readCompressedInt(block); // commonLength + ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate(); block.reset(); - ByteBuffer dup = block.duplicate(); - dup.position(pos); - dup.limit(pos + keyLength); - return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength); + // TODO : Change to BBCell. + return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java index 37001cc..ffdb694 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDecodingContext.java @@ -17,10 +17,10 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.nio.ByteBuff; /** * A decoding context that is created by a reader's encoder, and is shared @@ -32,22 +32,27 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; public interface HFileBlockDecodingContext { /** - * Perform all actions that need to be done before the encoder's real decoding process. - * Decompression needs to be done if {@link HFileContext#getCompression()} returns a valid compression + * Perform all actions that need to be done before the encoder's real decoding + * process. Decompression needs to be done if + * {@link HFileContext#getCompression()} returns a valid compression * algorithm. * - * @param onDiskSizeWithoutHeader numBytes after block and encoding headers - * @param uncompressedSizeWithoutHeader numBytes without header required to store the block after + * @param onDiskSizeWithoutHeader + * numBytes after block and encoding headers + * @param uncompressedSizeWithoutHeader + * numBytes without header required to store the block after * decompressing (not decoding) - * @param blockBufferWithoutHeader ByteBuffer pointed after the header but before the data - * @param onDiskBlock on disk data to be decoded + * @param blockBufferWithoutHeader + * ByteBuffer pointed after the header but before the data + * @param onDiskBlock + * on disk data to be decoded * @throws IOException */ void prepareDecoding( int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, - ByteBuffer blockBufferWithoutHeader, - ByteBuffer onDiskBlock + ByteBuff blockBufferWithoutHeader, + ByteBuff onDiskBlock ) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java index 78bb0d6..30382d9 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/HFileBlockDefaultDecodingContext.java @@ -19,17 +19,17 @@ package org.apache.hadoop.hbase.io.encoding; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.io.ByteBufferInputStream; +import org.apache.hadoop.hbase.io.ByteBuffInputStream; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Decryptor; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.hfile.HFileContext; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Bytes; /** @@ -51,8 +51,8 @@ public class HFileBlockDefaultDecodingContext implements @Override public void prepareDecoding(int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, - ByteBuffer blockBufferWithoutHeader, ByteBuffer onDiskBlock) throws IOException { - InputStream in = new DataInputStream(new ByteBufferInputStream(onDiskBlock)); + ByteBuff blockBufferWithoutHeader, ByteBuff onDiskBlock) throws IOException { + InputStream in = new DataInputStream(new ByteBuffInputStream(onDiskBlock)); Encryption.Context cryptoContext = fileContext.getEncryptionContext(); if (cryptoContext != Encryption.Context.NONE) { http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java index 15608cc..6e89de4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/encoding/PrefixKeyDeltaEncoder.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; @@ -172,22 +173,21 @@ public class PrefixKeyDeltaEncoder extends BufferedDataBlockEncoder { } @Override - public Cell getFirstKeyCellInBlock(ByteBuffer block) { + public Cell getFirstKeyCellInBlock(ByteBuff block) { block.mark(); block.position(Bytes.SIZEOF_INT); - int keyLength = ByteBufferUtils.readCompressedInt(block); - ByteBufferUtils.readCompressedInt(block); - int commonLength = ByteBufferUtils.readCompressedInt(block); + int keyLength = ByteBuff.readCompressedInt(block); + // TODO : See if we can avoid these reads as the read values are not getting used + ByteBuff.readCompressedInt(block); + int commonLength = ByteBuff.readCompressedInt(block); if (commonLength != 0) { throw new AssertionError("Nonzero common length in the first key in " + "block: " + commonLength); } - int pos = block.position(); + ByteBuffer key = block.asSubByteBuffer(keyLength).duplicate(); block.reset(); - ByteBuffer dup = block.duplicate(); - dup.position(pos); - dup.limit(pos + keyLength); - return new KeyValue.KeyOnlyKeyValue(dup.array(), dup.arrayOffset() + pos, keyLength); + // TODO : Change to BBCell + return new KeyValue.KeyOnlyKeyValue(key.array(), key.arrayOffset() + key.position(), keyLength); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java index 0db584e..4228f57 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java @@ -26,6 +26,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.Bytes; /** @@ -131,7 +132,7 @@ public enum BlockType { out.write(magic); } - public void write(ByteBuffer buf) { + public void write(ByteBuff buf) { buf.put(magic); } @@ -161,7 +162,7 @@ public enum BlockType { return parse(buf, 0, buf.length); } - public static BlockType read(ByteBuffer buf) throws IOException { + public static BlockType read(ByteBuff buf) throws IOException { byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), MAGIC_LENGTH)]; buf.get(magicBuf); BlockType blockType = parse(magicBuf, 0, magicBuf.length); http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java new file mode 100644 index 0000000..14e77a7 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/ByteBuff.java @@ -0,0 +1,438 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.nio; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * An abstract class that abstracts out as to how the byte buffers are used, + * either single or multiple. We have this interface because the java's ByteBuffers + * cannot be sub-classed. This class provides APIs similar to the ones provided + * in java's nio ByteBuffers and allows you to do positional reads/writes and relative + * reads and writes on the underlying BB. In addition to it, we have some additional APIs which + * helps us in the read path. + */ [email protected] +public abstract class ByteBuff { + /** + * @return this ByteBuff's current position + */ + public abstract int position(); + + /** + * Sets this ByteBuff's position to the given value. + * @param position + * @return this object + */ + public abstract ByteBuff position(int position); + + /** + * Jumps the current position of this ByteBuff by specified length. + * @param len the length to be skipped + */ + public abstract ByteBuff skip(int len); + + /** + * Jumps back the current position of this ByteBuff by specified length. + * @param len the length to move back + */ + public abstract ByteBuff moveBack(int len); + + /** + * @return the total capacity of this ByteBuff. + */ + public abstract int capacity(); + + /** + * Returns the limit of this ByteBuff + * @return limit of the ByteBuff + */ + public abstract int limit(); + + /** + * Marks the limit of this ByteBuff. + * @param limit + * @return This ByteBuff + */ + public abstract ByteBuff limit(int limit); + + /** + * Rewinds this ByteBuff and the position is set to 0 + * @return this object + */ + public abstract ByteBuff rewind(); + + /** + * Marks the current position of the ByteBuff + * @return this object + */ + public abstract ByteBuff mark(); + + /** + * Returns bytes from current position till length specified, as a single ByteBuffer. When all + * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item + * as such will be returned. So users are warned not to change the position or limit of this + * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required + * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy + * the bytes to a newly created ByteBuffer of required size and return that. + * + * @param length number of bytes required. + * @return bytes from current position till length specified, as a single ByteButter. + */ + public abstract ByteBuffer asSubByteBuffer(int length); + + /** + * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these + * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as + * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are + * warned not to change the position or limit of this returned ByteBuffer. When the required bytes + * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created + * ByteBuffer of required size and return that. + * + * @param offset the offset in this ByteBuff from where the subBuffer should be created + * @param length the length of the subBuffer + * @param pair a pair that will have the bytes from the current position till length specified, + * as a single ByteBuffer and offset in that Buffer where the bytes starts. + * Since this API gets called in a loop we are passing a pair to it which could be created + * outside the loop and the method would set the values on the pair that is passed in by + * the caller. Thus it avoids more object creations that would happen if the pair that is + * returned is created by this method every time. + */ + public abstract void asSubByteBuffer(int offset, int length, Pair<ByteBuffer, Integer> pair); + + /** + * Returns the number of elements between the current position and the + * limit. + * @return the remaining elements in this ByteBuff + */ + public abstract int remaining(); + + /** + * Returns true if there are elements between the current position and the limt + * @return true if there are elements, false otherwise + */ + public abstract boolean hasRemaining(); + + /** + * Similar to {@link ByteBuffer}.reset(), ensures that this ByteBuff + * is reset back to last marked position. + * @return This ByteBuff + */ + public abstract ByteBuff reset(); + + /** + * Returns an ByteBuff which is a sliced version of this ByteBuff. The position, limit and mark + * of the new ByteBuff will be independent than that of the original ByteBuff. + * The content of the new ByteBuff will start at this ByteBuff's current position + * @return a sliced ByteBuff + */ + public abstract ByteBuff slice(); + + /** + * Returns an ByteBuff which is a duplicate version of this ByteBuff. The + * position, limit and mark of the new ByteBuff will be independent than that + * of the original ByteBuff. The content of the new ByteBuff will start at + * this ByteBuff's current position The position, limit and mark of the new + * ByteBuff would be identical to this ByteBuff in terms of values. + * + * @return a sliced ByteBuff + */ + public abstract ByteBuff duplicate(); + + /** + * A relative method that returns byte at the current position. Increments the + * current position by the size of a byte. + * @return the byte at the current position + */ + public abstract byte get(); + + /** + * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers + * @param index + * @return the byte at the given index + */ + public abstract byte get(int index); + + /** + * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers. + * The difference for this API from {@link #get(int)} the index specified should be after + * the current position. If not throws IndexOutOfBoundsException + * @param index + * @return the byte value at the given index. + */ + public abstract byte getByteStrictlyForward(int index); + + /** + * Writes a byte to this ByteBuff at the current position and increments the position + * @param b + * @return this object + */ + public abstract ByteBuff put(byte b); + + /** + * Writes a byte to this ByteBuff at the given index + * @param index + * @param b + * @return this object + */ + public abstract ByteBuff put(int index, byte b); + + /** + * Copies the specified number of bytes from this ByteBuff's current position to + * the byte[]'s offset. Also advances the position of the ByteBuff by the given length. + * @param dst + * @param offset within the current array + * @param length upto which the bytes to be copied + */ + public abstract void get(byte[] dst, int offset, int length); + + /** + * Copies the content from this ByteBuff's current position to the byte array and fills it. Also + * advances the position of the ByteBuff by the length of the byte[]. + * @param dst + */ + public abstract void get(byte[] dst); + + /** + * Copies from the given byte[] to this ByteBuff + * @param src + * @param offset the position in the byte array from which the copy should be done + * @param length the length upto which the copy should happen + * @return this ByteBuff + */ + public abstract ByteBuff put(byte[] src, int offset, int length); + + /** + * Copies from the given byte[] to this ByteBuff + * @param src + * @return this ByteBuff + */ + public abstract ByteBuff put(byte[] src); + + /** + * @return true or false if the underlying BB support hasArray + */ + public abstract boolean hasArray(); + + /** + * @return the byte[] if the underlying BB has single BB and hasArray true + */ + public abstract byte[] array(); + + /** + * @return the arrayOffset of the byte[] incase of a single BB backed ByteBuff + */ + public abstract int arrayOffset(); + + /** + * Returns the short value at the current position. Also advances the position by the size + * of short + * + * @return the short value at the current position + */ + public abstract short getShort(); + + /** + * Fetches the short value at the given index. Does not change position of the + * underlying ByteBuffers. The caller is sure that the index will be after + * the current position of this ByteBuff. So even if the current short does not fit in the + * current item we can safely move to the next item and fetch the remaining bytes forming + * the short + * + * @param index + * @return the short value at the given index + */ + public abstract short getShort(int index); + + /** + * Fetches the short at the given index. Does not change position of the underlying ByteBuffers. + * The difference for this API from {@link #getShort(int)} the index specified should be + * after the current position. If not throws IndexOutOfBoundsException + * @param index + * @return the short value at the given index. + */ + public abstract short getShortStrictlyForward(int index); + + /** + * Returns the int value at the current position. Also advances the position by the size of int + * + * @return the int value at the current position + */ + public abstract int getInt(); + + /** + * Writes an int to this ByteBuff at its current position. Also advances the position + * by size of int + * @param value Int value to write + * @return this object + */ + public abstract ByteBuff putInt(int value); + + /** + * Fetches the int at the given index. Does not change position of the underlying ByteBuffers. + * Even if the current int does not fit in the + * current item we can safely move to the next item and fetch the remaining bytes forming + * the int + * + * @param index + * @return the int value at the given index + */ + public abstract int getInt(int index); + + /** + * Fetches the int at the given index. Does not change position of the underlying ByteBuffers. + * The difference for this API from {@link #getInt(int)} the index specified should be after + * the current position. If not throws IndexOutOfBoundsException + * @param index + * @return the int value at the given index. + */ + // TODO: any better name here?? getIntFromSubsequentPosition? or getIntAfterCurrentPosition? + // TODO : Make this relative wrt current position? Follow on JIRA + public abstract int getIntStrictlyForward(int index); + /** + * Returns the long value at the current position. Also advances the position by the size of long + * + * @return the long value at the current position + */ + public abstract long getLong(); + + /** + * Writes a long to this ByteBuff at its current position. + * Also advances the position by size of long + * @param value Long value to write + * @return this object + */ + public abstract ByteBuff putLong(long value); + + /** + * Fetches the long at the given index. Does not change position of the + * underlying ByteBuffers. The caller is sure that the index will be after + * the current position of this ByteBuff. So even if the current long does not fit in the + * current item we can safely move to the next item and fetch the remaining bytes forming + * the long + * + * @param index + * @return the long value at the given index + */ + public abstract long getLong(int index); + + /** + * Fetches the long at the given index. Does not change position of the underlying ByteBuffers. + * The difference for this API from {@link #getLong(int)} the index specified should be after + * the current position. If not throws IndexOutOfBoundsException + * @param index + * @return the long value at the given index. + */ + public abstract long getLongStrictlyForward(int index); + + /** + * Copy the content from this ByteBuff to a byte[] based on the given offset and + * length + * + * @param offset + * the position from where the copy should start + * @param length + * the length upto which the copy has to be done + * @return byte[] with the copied contents from this ByteBuff. + */ + public abstract byte[] toBytes(int offset, int length); + + /** + * Copies the content from this ByteBuff to a ByteBuffer + * Note : This will advance the position marker of {@code out} but not change the position maker + * for this ByteBuff + * @param out the ByteBuffer to which the copy has to happen + * @param sourceOffset the offset in the ByteBuff from which the elements has + * to be copied + * @param length the length in this ByteBuff upto which the elements has to be copied + */ + public abstract void get(ByteBuffer out, int sourceOffset, int length); + + /** + * Copies the contents from the src ByteBuff to this ByteBuff. This will be + * absolute positional copying and + * won't affect the position of any of the buffers. + * @param offset the position in this ByteBuff to which the copy should happen + * @param src the src ByteBuff + * @param srcOffset the offset in the src ByteBuff from where the elements should be read + * @param length the length up to which the copy should happen + */ + public abstract ByteBuff put(int offset, ByteBuff src, int srcOffset, int length); + + // static helper methods + /** + * Read integer from ByteBuff coded in 7 bits and increment position. + * @return Read integer. + */ + public static int readCompressedInt(ByteBuff buf) { + byte b = buf.get(); + if ((b & ByteBufferUtils.NEXT_BIT_MASK) != 0) { + return (b & ByteBufferUtils.VALUE_MASK) + + (readCompressedInt(buf) << ByteBufferUtils.NEXT_BIT_SHIFT); + } + return b & ByteBufferUtils.VALUE_MASK; + } + + /** + * Compares two ByteBuffs + * + * @param buf1 the first ByteBuff + * @param o1 the offset in the first ByteBuff from where the compare has to happen + * @param len1 the length in the first ByteBuff upto which the compare has to happen + * @param buf2 the second ByteBuff + * @param o2 the offset in the second ByteBuff from where the compare has to happen + * @param len2 the length in the second ByteBuff upto which the compare has to happen + * @return Positive if buf1 is bigger than buf2, 0 if they are equal, and negative if buf1 is + * smaller than buf2. + */ + public static int compareTo(ByteBuff buf1, int o1, int len1, ByteBuff buf2, + int o2, int len2) { + if (buf1.hasArray() && buf2.hasArray()) { + return Bytes.compareTo(buf1.array(), buf1.arrayOffset() + o1, len1, buf2.array(), + buf2.arrayOffset() + o2, len2); + } + int end1 = o1 + len1; + int end2 = o2 + len2; + for (int i = o1, j = o2; i < end1 && j < end2; i++, j++) { + int a = buf1.get(i) & 0xFF; + int b = buf2.get(j) & 0xFF; + if (a != b) { + return a - b; + } + } + return len1 - len2; + } + + /** + * Read long which was written to fitInBytes bytes and increment position. + * @param fitInBytes In how many bytes given long is stored. + * @return The value of parsed long. + */ + public static long readLong(ByteBuff in, final int fitInBytes) { + long tmpLength = 0; + for (int i = 0; i < fitInBytes; ++i) { + tmpLength |= (in.get() & 0xffl) << (8l * i); + } + return tmpLength; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/834f87b2/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java new file mode 100644 index 0000000..984ade5 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java @@ -0,0 +1,1100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.nio; + +import java.nio.BufferOverflowException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.InvalidMarkException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ByteBufferUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; + +/** + * Provides a unified view of all the underlying ByteBuffers and will look as if a bigger + * sequential buffer. This class provides similar APIs as in {@link ByteBuffer} to put/get int, + * short, long etc and doing operations like mark, reset, slice etc. This has to be used when + * data is split across multiple byte buffers and we don't want copy them to single buffer + * for reading from it. + */ [email protected] +public class MultiByteBuff extends ByteBuff { + + private final ByteBuffer[] items; + // Pointer to the current item in the MBB + private ByteBuffer curItem = null; + // Index of the current item in the MBB + private int curItemIndex = 0; + + private int limit = 0; + private int limitedItemIndex; + private int markedItemIndex = -1; + private final int[] itemBeginPos; + + public MultiByteBuff(ByteBuffer... items) { + assert items != null; + assert items.length > 0; + this.items = items; + this.curItem = this.items[this.curItemIndex]; + // See below optimization in getInt(int) where we check whether the given index land in current + // item. For this we need to check whether the passed index is less than the next item begin + // offset. To handle this effectively for the last item buffer, we add an extra item into this + // array. + itemBeginPos = new int[items.length + 1]; + int offset = 0; + for (int i = 0; i < items.length; i++) { + ByteBuffer item = items[i]; + item.rewind(); + itemBeginPos[i] = offset; + int l = item.limit() - item.position(); + offset += l; + } + this.limit = offset; + this.itemBeginPos[items.length] = offset + 1; + this.limitedItemIndex = this.items.length - 1; + } + + private MultiByteBuff(ByteBuffer[] items, int[] itemBeginPos, int limit, int limitedIndex, + int curItemIndex, int markedIndex) { + this.items = items; + this.curItemIndex = curItemIndex; + this.curItem = this.items[this.curItemIndex]; + this.itemBeginPos = itemBeginPos; + this.limit = limit; + this.limitedItemIndex = limitedIndex; + this.markedItemIndex = markedIndex; + } + + /** + * @throws UnsupportedOperationException MBB does not support + * array based operations + */ + @Override + public byte[] array() { + throw new UnsupportedOperationException(); + } + + /** + * @throws UnsupportedOperationException MBB does not + * support array based operations + */ + @Override + public int arrayOffset() { + throw new UnsupportedOperationException(); + } + + /** + * @return false. MBB does not support array based operations + */ + @Override + public boolean hasArray() { + return false; + } + + /** + * @return the total capacity of this MultiByteBuffer. + */ + @Override + public int capacity() { + int c = 0; + for (ByteBuffer item : this.items) { + c += item.capacity(); + } + return c; + } + + /** + * Fetches the byte at the given index. Does not change position of the underlying ByteBuffers + * @param index + * @return the byte at the given index + */ + @Override + public byte get(int index) { + int itemIndex = getItemIndex(index); + return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); + } + + @Override + public byte getByteStrictlyForward(int index) { + // Mostly the index specified will land within this current item. Short circuit for that + if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) { + throw new IndexOutOfBoundsException("The index " + index + + " should not be less than current position " + this.position()); + } + int itemIndex = getItemIndexFromCurItemIndex(index); + return ByteBufferUtils.toByte(this.items[itemIndex], index - this.itemBeginPos[itemIndex]); + } + + /* + * Returns in which sub ByteBuffer, the given element index will be available. + */ + private int getItemIndex(int elemIndex) { + int index = 1; + while (elemIndex >= this.itemBeginPos[index]) { + index++; + if (index == this.itemBeginPos.length) { + throw new IndexOutOfBoundsException(); + } + } + return index - 1; + } + + /* + * Returns in which sub ByteBuffer, the given element index will be available. In this case we are + * sure that the item will be after MBB's current position + */ + private int getItemIndexFromCurItemIndex(int elemIndex) { + int index = this.curItemIndex; + while (elemIndex >= this.itemBeginPos[index]) { + index++; + if (index == this.itemBeginPos.length) { + throw new IndexOutOfBoundsException(); + } + } + return index - 1; + } + + /** + * Fetches the int at the given index. Does not change position of the underlying ByteBuffers + * @param index + * @return the int value at the given index + */ + public int getInt(int index) { + // Mostly the index specified will land within this current item. Short circuit for that + int itemIndex; + if (this.itemBeginPos[this.curItemIndex] <= index + && this.itemBeginPos[this.curItemIndex + 1] > index) { + itemIndex = this.curItemIndex; + } else { + itemIndex = getItemIndex(index); + } + return getInt(index, itemIndex); + } + + @Override + public int getIntStrictlyForward(int index) { + // Mostly the index specified will land within this current item. Short circuit for that + if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) { + throw new IndexOutOfBoundsException("The index " + index + + " should not be less than current position " + this.position()); + } + int itemIndex; + if (this.itemBeginPos[this.curItemIndex + 1] > index) { + itemIndex = this.curItemIndex; + } else { + itemIndex = getItemIndexFromCurItemIndex(index); + } + return getInt(index, itemIndex); + } + + /** + * Fetches the short at the given index. Does not change position of the underlying ByteBuffers + * @param index + * @return the short value at the given index + */ + public short getShort(int index) { + // Mostly the index specified will land within this current item. Short circuit for that + int itemIndex; + if (this.itemBeginPos[this.curItemIndex] <= index + && this.itemBeginPos[this.curItemIndex + 1] > index) { + itemIndex = this.curItemIndex; + } else { + itemIndex = getItemIndex(index); + } + ByteBuffer item = items[itemIndex]; + int offsetInItem = index - this.itemBeginPos[itemIndex]; + if (item.limit() - offsetInItem >= Bytes.SIZEOF_SHORT) { + return ByteBufferUtils.toShort(item, offsetInItem); + } + if (items.length - 1 == itemIndex) { + // means cur item is the last one and we wont be able to read a int. Throw exception + throw new BufferUnderflowException(); + } + ByteBuffer nextItem = items[itemIndex + 1]; + // Get available one byte from this item and remaining one from next + short n = 0; + n ^= item.get(offsetInItem) & 0xFF; + n <<= 8; + n ^= nextItem.get(0) & 0xFF; + return n; + } + + @Override + public short getShortStrictlyForward(int index) { + // Mostly the index specified will land within this current item. Short circuit for that + if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) { + throw new IndexOutOfBoundsException("The index " + index + + " should not be less than current position " + this.position()); + } + int itemIndex; + if (this.itemBeginPos[this.curItemIndex + 1] > index) { + itemIndex = this.curItemIndex; + } else { + itemIndex = getItemIndexFromCurItemIndex(index); + } + return getShort(index, itemIndex); + } + + private int getInt(int index, int itemIndex) { + ByteBuffer item = items[itemIndex]; + int offsetInItem = index - this.itemBeginPos[itemIndex]; + int remainingLen = item.limit() - offsetInItem; + if (remainingLen >= Bytes.SIZEOF_INT) { + return ByteBufferUtils.toInt(item, offsetInItem); + } + if (items.length - 1 == itemIndex) { + // means cur item is the last one and we wont be able to read a int. Throw exception + throw new BufferUnderflowException(); + } + ByteBuffer nextItem = items[itemIndex + 1]; + // Get available bytes from this item and remaining from next + int l = 0; + for (int i = offsetInItem; i < item.capacity(); i++) { + l <<= 8; + l ^= item.get(i) & 0xFF; + } + for (int i = 0; i < Bytes.SIZEOF_INT - remainingLen; i++) { + l <<= 8; + l ^= nextItem.get(i) & 0xFF; + } + return l; + } + + private short getShort(int index, int itemIndex) { + ByteBuffer item = items[itemIndex]; + int offsetInItem = index - this.itemBeginPos[itemIndex]; + int remainingLen = item.limit() - offsetInItem; + if (remainingLen >= Bytes.SIZEOF_SHORT) { + return ByteBufferUtils.toShort(item, offsetInItem); + } + if (items.length - 1 == itemIndex) { + // means cur item is the last one and we wont be able to read a int. Throw exception + throw new BufferUnderflowException(); + } + ByteBuffer nextItem = items[itemIndex + 1]; + // Get available bytes from this item and remaining from next + short l = 0; + for (int i = offsetInItem; i < item.capacity(); i++) { + l <<= 8; + l ^= item.get(i) & 0xFF; + } + for (int i = 0; i < Bytes.SIZEOF_SHORT - remainingLen; i++) { + l <<= 8; + l ^= nextItem.get(i) & 0xFF; + } + return l; + } + + private long getLong(int index, int itemIndex) { + ByteBuffer item = items[itemIndex]; + int offsetInItem = index - this.itemBeginPos[itemIndex]; + int remainingLen = item.limit() - offsetInItem; + if (remainingLen >= Bytes.SIZEOF_LONG) { + return ByteBufferUtils.toLong(item, offsetInItem); + } + if (items.length - 1 == itemIndex) { + // means cur item is the last one and we wont be able to read a long. Throw exception + throw new BufferUnderflowException(); + } + ByteBuffer nextItem = items[itemIndex + 1]; + // Get available bytes from this item and remaining from next + long l = 0; + for (int i = offsetInItem; i < item.capacity(); i++) { + l <<= 8; + l ^= item.get(i) & 0xFF; + } + for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) { + l <<= 8; + l ^= nextItem.get(i) & 0xFF; + } + return l; + } + + /** + * Fetches the long at the given index. Does not change position of the underlying ByteBuffers + * @param index + * @return the long value at the given index + */ + public long getLong(int index) { + // Mostly the index specified will land within this current item. Short circuit for that + int itemIndex; + if (this.itemBeginPos[this.curItemIndex] <= index + && this.itemBeginPos[this.curItemIndex + 1] > index) { + itemIndex = this.curItemIndex; + } else { + itemIndex = getItemIndex(index); + } + ByteBuffer item = items[itemIndex]; + int offsetInItem = index - this.itemBeginPos[itemIndex]; + int remainingLen = item.limit() - offsetInItem; + if (remainingLen >= Bytes.SIZEOF_LONG) { + return ByteBufferUtils.toLong(item, offsetInItem); + } + if (items.length - 1 == itemIndex) { + // means cur item is the last one and we wont be able to read a long. Throw exception + throw new BufferUnderflowException(); + } + ByteBuffer nextItem = items[itemIndex + 1]; + // Get available bytes from this item and remaining from next + long l = 0; + for (int i = offsetInItem; i < item.capacity(); i++) { + l <<= 8; + l ^= item.get(i) & 0xFF; + } + for (int i = 0; i < Bytes.SIZEOF_LONG - remainingLen; i++) { + l <<= 8; + l ^= nextItem.get(i) & 0xFF; + } + return l; + } + + @Override + public long getLongStrictlyForward(int index) { + // Mostly the index specified will land within this current item. Short circuit for that + if(index < (this.itemBeginPos[this.curItemIndex] + this.curItem.position())) { + throw new IndexOutOfBoundsException("The index " + index + + " should not be less than current position " + this.position()); + } + int itemIndex; + if (this.itemBeginPos[this.curItemIndex + 1] > index) { + itemIndex = this.curItemIndex; + } else { + itemIndex = getItemIndexFromCurItemIndex(index); + } + return getLong(index, itemIndex); + } + + /** + * @return this MBB's current position + */ + @Override + public int position() { + return itemBeginPos[this.curItemIndex] + this.curItem.position(); + } + + /** + * Sets this MBB's position to the given value. + * @param position + * @return this object + */ + @Override + public MultiByteBuff position(int position) { + // Short circuit for positioning within the cur item. Mostly that is the case. + if (this.itemBeginPos[this.curItemIndex] <= position + && this.itemBeginPos[this.curItemIndex + 1] > position) { + this.curItem.position(position - this.itemBeginPos[this.curItemIndex]); + return this; + } + int itemIndex = getItemIndex(position); + // All items from 0 - curItem-1 set position at end. + for (int i = 0; i < itemIndex; i++) { + this.items[i].position(this.items[i].limit()); + } + // All items after curItem set position at begin + for (int i = itemIndex + 1; i < this.items.length; i++) { + this.items[i].position(0); + } + this.curItem = this.items[itemIndex]; + this.curItem.position(position - this.itemBeginPos[itemIndex]); + this.curItemIndex = itemIndex; + return this; + } + + /** + * Rewinds this MBB and the position is set to 0 + * @return this object + */ + @Override + public MultiByteBuff rewind() { + for (int i = 0; i < this.items.length; i++) { + this.items[i].rewind(); + } + this.curItemIndex = 0; + this.curItem = this.items[this.curItemIndex]; + this.markedItemIndex = -1; + return this; + } + + /** + * Marks the current position of the MBB + * @return this object + */ + @Override + public MultiByteBuff mark() { + this.markedItemIndex = this.curItemIndex; + this.curItem.mark(); + return this; + } + + /** + * Similar to {@link ByteBuffer}.reset(), ensures that this MBB + * is reset back to last marked position. + * @return This MBB + */ + @Override + public MultiByteBuff reset() { + // when the buffer is moved to the next one.. the reset should happen on the previous marked + // item and the new one should be taken as the base + if (this.markedItemIndex < 0) throw new InvalidMarkException(); + ByteBuffer markedItem = this.items[this.markedItemIndex]; + markedItem.reset(); + this.curItem = markedItem; + // All items after the marked position upto the current item should be reset to 0 + for (int i = this.curItemIndex; i > this.markedItemIndex; i--) { + this.items[i].position(0); + } + this.curItemIndex = this.markedItemIndex; + return this; + } + + /** + * Returns the number of elements between the current position and the + * limit. + * @return the remaining elements in this MBB + */ + @Override + public int remaining() { + int remain = 0; + for (int i = curItemIndex; i < items.length; i++) { + remain += items[i].remaining(); + } + return remain; + } + + /** + * Returns true if there are elements between the current position and the limt + * @return true if there are elements, false otherwise + */ + @Override + public final boolean hasRemaining() { + return this.curItem.hasRemaining() || this.curItemIndex < this.items.length - 1; + } + + /** + * A relative method that returns byte at the current position. Increments the + * current position by the size of a byte. + * @return the byte at the current position + */ + @Override + public byte get() { + if (this.curItem.remaining() == 0) { + if (items.length - 1 == this.curItemIndex) { + // means cur item is the last one and we wont be able to read a long. Throw exception + throw new BufferUnderflowException(); + } + this.curItemIndex++; + this.curItem = this.items[this.curItemIndex]; + } + return this.curItem.get(); + } + + /** + * Returns the short value at the current position. Also advances the position by the size + * of short + * + * @return the short value at the current position + */ + @Override + public short getShort() { + int remaining = this.curItem.remaining(); + if (remaining >= Bytes.SIZEOF_SHORT) { + return this.curItem.getShort(); + } + if (remaining == 0) { + if (items.length - 1 == this.curItemIndex) { + // means cur item is the last one and we wont be able to read a long. Throw exception + throw new BufferUnderflowException(); + } + this.curItemIndex++; + this.curItem = this.items[this.curItemIndex]; + return this.curItem.getShort(); + } + short n = 0; + n ^= get() & 0xFF; + n <<= 8; + n ^= get() & 0xFF; + return n; + } + + /** + * Returns the int value at the current position. Also advances the position by the size of int + * + * @return the int value at the current position + */ + @Override + public int getInt() { + int remaining = this.curItem.remaining(); + if (remaining >= Bytes.SIZEOF_INT) { + return this.curItem.getInt(); + } + if (remaining == 0) { + if (items.length - 1 == this.curItemIndex) { + // means cur item is the last one and we wont be able to read a long. Throw exception + throw new BufferUnderflowException(); + } + this.curItemIndex++; + this.curItem = this.items[this.curItemIndex]; + return this.curItem.getInt(); + } + // Get available bytes from this item and remaining from next + int n = 0; + for (int i = 0; i < Bytes.SIZEOF_INT; i++) { + n <<= 8; + n ^= get() & 0xFF; + } + return n; + } + + + /** + * Returns the long value at the current position. Also advances the position by the size of long + * + * @return the long value at the current position + */ + @Override + public long getLong() { + int remaining = this.curItem.remaining(); + if (remaining >= Bytes.SIZEOF_LONG) { + return this.curItem.getLong(); + } + if (remaining == 0) { + if (items.length - 1 == this.curItemIndex) { + // means cur item is the last one and we wont be able to read a long. Throw exception + throw new BufferUnderflowException(); + } + this.curItemIndex++; + this.curItem = this.items[this.curItemIndex]; + return this.curItem.getLong(); + } + // Get available bytes from this item and remaining from next + long l = 0; + for (int i = 0; i < Bytes.SIZEOF_LONG; i++) { + l <<= 8; + l ^= get() & 0xFF; + } + return l; + } + + /** + * Copies the content from this MBB's current position to the byte array and fills it. Also + * advances the position of the MBB by the length of the byte[]. + * @param dst + */ + @Override + public void get(byte[] dst) { + get(dst, 0, dst.length); + } + + /** + * Copies the specified number of bytes from this MBB's current position to the byte[]'s offset. + * Also advances the position of the MBB by the given length. + * @param dst + * @param offset within the current array + * @param length upto which the bytes to be copied + */ + @Override + public void get(byte[] dst, int offset, int length) { + while (length > 0) { + int toRead = Math.min(length, this.curItem.remaining()); + ByteBufferUtils.copyFromBufferToArray(dst, this.curItem, this.curItem.position(), offset, + toRead); + this.curItem.position(this.curItem.position() + toRead); + length -= toRead; + if (length == 0) + break; + this.curItemIndex++; + this.curItem = this.items[this.curItemIndex]; + offset += toRead; + } + } + + /** + * Marks the limit of this MBB. + * @param limit + * @return This MBB + */ + @Override + public MultiByteBuff limit(int limit) { + this.limit = limit; + // Normally the limit will try to limit within the last BB item + int limitedIndexBegin = this.itemBeginPos[this.limitedItemIndex]; + if (limit >= limitedIndexBegin && limit < this.itemBeginPos[this.limitedItemIndex + 1]) { + this.items[this.limitedItemIndex].limit(limit - limitedIndexBegin); + return this; + } + int itemIndex = getItemIndex(limit); + int beginOffset = this.itemBeginPos[itemIndex]; + int offsetInItem = limit - beginOffset; + ByteBuffer item = items[itemIndex]; + item.limit(offsetInItem); + for (int i = this.limitedItemIndex; i < itemIndex; i++) { + this.items[i].limit(this.items[i].capacity()); + } + this.limitedItemIndex = itemIndex; + for (int i = itemIndex + 1; i < this.items.length; i++) { + this.items[i].limit(this.items[i].position()); + } + return this; + } + + /** + * Returns the limit of this MBB + * @return limit of the MBB + */ + @Override + public int limit() { + return this.limit; + } + + /** + * Returns an MBB which is a sliced version of this MBB. The position, limit and mark + * of the new MBB will be independent than that of the original MBB. + * The content of the new MBB will start at this MBB's current position + * @return a sliced MBB + */ + @Override + public MultiByteBuff slice() { + ByteBuffer[] copy = new ByteBuffer[this.limitedItemIndex - this.curItemIndex + 1]; + for (int i = curItemIndex, j = 0; i <= this.limitedItemIndex; i++, j++) { + copy[j] = this.items[i].slice(); + } + return new MultiByteBuff(copy); + } + + /** + * Returns an MBB which is a duplicate version of this MBB. The position, limit and mark + * of the new MBB will be independent than that of the original MBB. + * The content of the new MBB will start at this MBB's current position + * The position, limit and mark of the new MBB would be identical to this MBB in terms of + * values. + * @return a sliced MBB + */ + @Override + public MultiByteBuff duplicate() { + ByteBuffer[] itemsCopy = new ByteBuffer[this.items.length]; + for (int i = 0; i < this.items.length; i++) { + itemsCopy[i] = items[i].duplicate(); + } + return new MultiByteBuff(itemsCopy, this.itemBeginPos, this.limit, this.limitedItemIndex, + this.curItemIndex, this.markedItemIndex); + } + + /** + * Writes a byte to this MBB at the current position and increments the position + * @param b + * @return this object + */ + @Override + public MultiByteBuff put(byte b) { + if (this.curItem.remaining() == 0) { + if (this.curItemIndex == this.items.length - 1) { + throw new BufferOverflowException(); + } + this.curItemIndex++; + this.curItem = this.items[this.curItemIndex]; + } + this.curItem.put(b); + return this; + } + + /** + * Writes a byte to this MBB at the given index + * @param index + * @param b + * @return this object + */ + @Override + public MultiByteBuff put(int index, byte b) { + int itemIndex = getItemIndex(limit); + ByteBuffer item = items[itemIndex]; + item.put(index - itemBeginPos[itemIndex], b); + return this; + } + + /** + * Copies from a src MBB to this MBB. + * @param offset the position in this MBB to which the copy should happen + * @param src the src MBB + * @param srcOffset the offset in the src MBB from where the elements should be read + * @param length the length upto which the copy should happen + */ + @Override + public MultiByteBuff put(int offset, ByteBuff src, int srcOffset, int length) { + int destItemIndex = getItemIndex(offset); + int srcItemIndex = getItemIndex(srcOffset); + ByteBuffer destItem = this.items[destItemIndex]; + offset = offset - this.itemBeginPos[destItemIndex]; + + ByteBuffer srcItem = getItemByteBuffer(src, srcItemIndex); + srcOffset = srcOffset - this.itemBeginPos[srcItemIndex]; + int toRead, toWrite, toMove; + while (length > 0) { + toWrite = destItem.limit() - offset; + toRead = srcItem.limit() - srcOffset; + toMove = Math.min(length, Math.min(toRead, toWrite)); + ByteBufferUtils.copyFromBufferToBuffer(srcItem, destItem, srcOffset, offset, toMove); + length -= toMove; + if (length == 0) break; + if (toRead < toWrite) { + srcItem = getItemByteBuffer(src, ++srcItemIndex); + srcOffset = 0; + offset += toMove; + } else if (toRead > toWrite) { + destItem = this.items[++destItemIndex]; + offset = 0; + srcOffset += toMove; + } else { + // toRead = toWrite case + srcItem = getItemByteBuffer(src, ++srcItemIndex); + srcOffset = 0; + destItem = this.items[++destItemIndex]; + offset = 0; + } + } + return this; + } + + private static ByteBuffer getItemByteBuffer(ByteBuff buf, int index) { + return (buf instanceof SingleByteBuff) ? ((SingleByteBuff) buf).getEnclosingByteBuffer() + : ((MultiByteBuff) buf).items[index]; + } + + /** + * Writes an int to this MBB at its current position. Also advances the position by size of int + * @param val Int value to write + * @return this object + */ + @Override + public MultiByteBuff putInt(int val) { + if (this.curItem.remaining() >= Bytes.SIZEOF_INT) { + this.curItem.putInt(val); + return this; + } + if (this.curItemIndex == this.items.length - 1) { + throw new BufferOverflowException(); + } + // During read, we will read as byte by byte for this case. So just write in Big endian + put(int3(val)); + put(int2(val)); + put(int1(val)); + put(int0(val)); + return this; + } + + private static byte int3(int x) { + return (byte) (x >> 24); + } + + private static byte int2(int x) { + return (byte) (x >> 16); + } + + private static byte int1(int x) { + return (byte) (x >> 8); + } + + private static byte int0(int x) { + return (byte) (x); + } + + /** + * Copies from the given byte[] to this MBB + * @param src + * @return this MBB + */ + @Override + public final MultiByteBuff put(byte[] src) { + return put(src, 0, src.length); + } + + /** + * Copies from the given byte[] to this MBB + * @param src + * @param offset the position in the byte array from which the copy should be done + * @param length the length upto which the copy should happen + * @return this MBB + */ + @Override + public MultiByteBuff put(byte[] src, int offset, int length) { + if (this.curItem.remaining() >= length) { + ByteBufferUtils.copyFromArrayToBuffer(this.curItem, src, offset, length); + return this; + } + int end = offset + length; + for (int i = offset; i < end; i++) { + this.put(src[i]); + } + return this; + } + + + /** + * Writes a long to this MBB at its current position. Also advances the position by size of long + * @param val Long value to write + * @return this object + */ + @Override + public MultiByteBuff putLong(long val) { + if (this.curItem.remaining() >= Bytes.SIZEOF_LONG) { + this.curItem.putLong(val); + return this; + } + if (this.curItemIndex == this.items.length - 1) { + throw new BufferOverflowException(); + } + // During read, we will read as byte by byte for this case. So just write in Big endian + put(long7(val)); + put(long6(val)); + put(long5(val)); + put(long4(val)); + put(long3(val)); + put(long2(val)); + put(long1(val)); + put(long0(val)); + return this; + } + + private static byte long7(long x) { + return (byte) (x >> 56); + } + + private static byte long6(long x) { + return (byte) (x >> 48); + } + + private static byte long5(long x) { + return (byte) (x >> 40); + } + + private static byte long4(long x) { + return (byte) (x >> 32); + } + + private static byte long3(long x) { + return (byte) (x >> 24); + } + + private static byte long2(long x) { + return (byte) (x >> 16); + } + + private static byte long1(long x) { + return (byte) (x >> 8); + } + + private static byte long0(long x) { + return (byte) (x); + } + + /** + * Jumps the current position of this MBB by specified length. + * @param length + */ + @Override + public MultiByteBuff skip(int length) { + // Get available bytes from this item and remaining from next + int jump = 0; + while (true) { + jump = this.curItem.remaining(); + if (jump >= length) { + this.curItem.position(this.curItem.position() + length); + break; + } + this.curItem.position(this.curItem.position() + jump); + length -= jump; + this.curItemIndex++; + this.curItem = this.items[this.curItemIndex]; + } + return this; + } + + /** + * Jumps back the current position of this MBB by specified length. + * @param length + */ + @Override + public MultiByteBuff moveBack(int length) { + while (length != 0) { + if (length > curItem.position()) { + length -= curItem.position(); + this.curItem.position(0); + this.curItemIndex--; + this.curItem = this.items[curItemIndex]; + } else { + this.curItem.position(curItem.position() - length); + break; + } + } + return this; + } + + /** + * Returns bytes from current position till length specified, as a single ByteBuffer. When all + * these bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item + * as such will be returned. So users are warned not to change the position or limit of this + * returned ByteBuffer. The position of the returned byte buffer is at the begin of the required + * bytes. When the required bytes happen to span across multiple ByteBuffers, this API will copy + * the bytes to a newly created ByteBuffer of required size and return that. + * + * @param length number of bytes required. + * @return bytes from current position till length specified, as a single ByteButter. + */ + @Override + public ByteBuffer asSubByteBuffer(int length) { + if (this.curItem.remaining() >= length) { + return this.curItem; + } + int offset = 0; + byte[] dupB = new byte[length]; + int locCurItemIndex = curItemIndex; + ByteBuffer locCurItem = curItem; + while (length > 0) { + int toRead = Math.min(length, locCurItem.remaining()); + ByteBufferUtils + .copyFromBufferToArray(dupB, locCurItem, locCurItem.position(), offset, toRead); + length -= toRead; + if (length == 0) + break; + locCurItemIndex++; + locCurItem = this.items[locCurItemIndex]; + offset += toRead; + } + return ByteBuffer.wrap(dupB); + } + + /** + * Returns bytes from given offset till length specified, as a single ByteBuffer. When all these + * bytes happen to be in a single ByteBuffer, which this object wraps, that ByteBuffer item as + * such will be returned (with offset in this ByteBuffer where the bytes starts). So users are + * warned not to change the position or limit of this returned ByteBuffer. When the required bytes + * happen to span across multiple ByteBuffers, this API will copy the bytes to a newly created + * ByteBuffer of required size and return that. + * + * @param offset the offset in this MBB from where the subBuffer should be created + * @param length the length of the subBuffer + * @param pair a pair that will have the bytes from the current position till length specified, as + * a single ByteBuffer and offset in that Buffer where the bytes starts. The method would + * set the values on the pair that is passed in by the caller + */ + @Override + public void asSubByteBuffer(int offset, int length, Pair<ByteBuffer, Integer> pair) { + if (this.itemBeginPos[this.curItemIndex] <= offset) { + int relOffsetInCurItem = offset - this.itemBeginPos[this.curItemIndex]; + if (this.curItem.limit() - relOffsetInCurItem >= length) { + pair.setFirst(this.curItem); + pair.setSecond(relOffsetInCurItem); + return; + } + } + int itemIndex = getItemIndex(offset); + ByteBuffer item = this.items[itemIndex]; + offset = offset - this.itemBeginPos[itemIndex]; + if (item.limit() - offset >= length) { + pair.setFirst(item); + pair.setSecond(offset); + return; + } + byte[] dst = new byte[length]; + int destOffset = 0; + while (length > 0) { + int toRead = Math.min(length, item.limit() - offset); + ByteBufferUtils.copyFromBufferToArray(dst, item, offset, destOffset, toRead); + length -= toRead; + if (length == 0) break; + itemIndex++; + item = this.items[itemIndex]; + destOffset += toRead; + offset = 0; + } + pair.setFirst(ByteBuffer.wrap(dst)); + pair.setSecond(0); + return; + } + + /** + * Copies the content from an this MBB to a ByteBuffer + * @param out the ByteBuffer to which the copy has to happen + * @param sourceOffset the offset in the MBB from which the elements has + * to be copied + * @param length the length in the MBB upto which the elements has to be copied + */ + @Override + public void get(ByteBuffer out, int sourceOffset, + int length) { + // Not used from real read path actually. So not going with + // optimization + for (int i = 0; i < length; ++i) { + out.put(this.get(sourceOffset + i)); + } + } + + /** + * Copy the content from this MBB to a byte[] based on the given offset and + * length + * + * @param offset + * the position from where the copy should start + * @param length + * the length upto which the copy has to be done + * @return byte[] with the copied contents from this MBB. + */ + @Override + public byte[] toBytes(int offset, int length) { + byte[] output = new byte[length]; + int itemIndex = getItemIndex(offset); + ByteBuffer item = this.items[itemIndex]; + int toRead = item.limit() - offset; + int destinationOffset = 0; + while (length > 0) { + toRead = Math.min(length, toRead); + ByteBufferUtils.copyFromBufferToArray(output, item, offset, destinationOffset, toRead); + length -= toRead; + if (length == 0) + break; + destinationOffset += toRead; + offset = 0; + item = items[++itemIndex]; + toRead = item.remaining(); + } + return output; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof MultiByteBuff)) return false; + if (this == obj) return true; + MultiByteBuff that = (MultiByteBuff) obj; + if (this.capacity() != that.capacity()) return false; + if (ByteBuff.compareTo(this, this.position(), this.limit(), that, that.position(), + that.limit()) == 0) { + return true; + } + return false; + } + + @Override + public int hashCode() { + int hash = 0; + for (ByteBuffer b : this.items) { + hash += b.hashCode(); + } + return hash; + } +}
