This is an automated email from the ASF dual-hosted git repository. openinx pushed a commit to branch HBASE-21879 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 6d953350c01c4f02b70b84804664d42183488ef2 Author: huzheng <[email protected]> AuthorDate: Fri Mar 8 16:46:06 2019 +0800 HBASE-22016 Rewrite the block reading methods by using hbase.nio.ByteBuff --- .../apache/hadoop/hbase/io/hfile/BlockIOUtils.java | 223 ++++++++++++++ .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 337 ++++++++------------- ...ckPositionalRead.java => TestBlockIOUtils.java} | 122 ++++++-- .../apache/hadoop/hbase/io/hfile/TestChecksum.java | 14 +- 4 files changed, 453 insertions(+), 243 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java new file mode 100644 index 0000000..dbd5b2e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockIOUtils.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.io.hfile; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.io.IOUtils; +import org.apache.yetus.audience.InterfaceAudience; + [email protected] +class BlockIOUtils { + + static boolean isByteBufferReadable(FSDataInputStream is) { + InputStream cur = is.getWrappedStream(); + for (;;) { + if ((cur instanceof FSDataInputStream)) { + cur = ((FSDataInputStream) cur).getWrappedStream(); + } else { + break; + } + } + return cur instanceof ByteBufferReadable; + } + + /** + * Read length bytes into ByteBuffers directly. + * @param buf the destination {@link ByteBuff} + * @param dis the HDFS input stream which implement the ByteBufferReadable interface. + * @param length bytes to read. + * @throws IOException exception to throw if any error happen + */ + static void readFully(ByteBuff buf, FSDataInputStream dis, int length) throws IOException { + if (!isByteBufferReadable(dis)) { + // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to + // the destination ByteBuff. + byte[] heapBuf = new byte[length]; + IOUtils.readFully(dis, heapBuf, 0, length); + copyToByteBuff(heapBuf, 0, length, buf); + return; + } + ByteBuffer[] buffers = buf.nioByteBuffers(); + int remain = length; + int idx = 0; + ByteBuffer cur = buffers[idx]; + while (remain > 0) { + while (!cur.hasRemaining()) { + if (++idx >= buffers.length) { + throw new IOException( + "Not enough ByteBuffers to read the reminding " + remain + " " + "bytes"); + } + cur = buffers[idx]; + } + cur.limit(cur.position() + Math.min(remain, cur.remaining())); + int bytesRead = dis.read(cur); + if (bytesRead < 0) { + throw new IOException( + "Premature EOF from inputStream, but still need " + remain + " " + "bytes"); + } + remain -= bytesRead; + } + } + + /** + * Read from an input stream at least <code>necessaryLen</code> and if possible, + * <code>extraLen</code> also if available. Analogous to + * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a number of "extra" + * bytes to also optionally read. + * @param in the input stream to read from + * @param buf the buffer to read into + * @param bufOffset the destination offset in the buffer + * @param necessaryLen the number of bytes that are absolutely necessary to read + * @param extraLen the number of extra bytes that would be nice to read + * @return true if succeeded reading the extra bytes + * @throws IOException if failed to read the necessary bytes + */ + private static boolean readWithExtraOnHeap(InputStream in, byte[] buf, int bufOffset, + int necessaryLen, int extraLen) throws IOException { + int bytesRemaining = necessaryLen + extraLen; + while (bytesRemaining > 0) { + int ret = in.read(buf, bufOffset, bytesRemaining); + if (ret < 0) { + if (bytesRemaining <= extraLen) { + // We could not read the "extra data", but that is OK. + break; + } + throw new IOException("Premature EOF from inputStream (read " + "returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, " + "successfully read " + (necessaryLen + extraLen - bytesRemaining)); + } + bufOffset += ret; + bytesRemaining -= ret; + } + return bytesRemaining <= 0; + } + + /** + * Read bytes into ByteBuffers directly, those buffers either contains the extraLen bytes or only + * contains necessaryLen bytes, which depends on how much bytes do the last time we read. + * @param buf the destination {@link ByteBuff}. + * @param dis input stream to read. + * @param necessaryLen bytes which we must read + * @param extraLen bytes which we may read + * @return if the returned flag is true, then we've finished to read the extraLen into our + * ByteBuffers, otherwise we've not read the extraLen bytes yet. + * @throws IOException if failed to read the necessary bytes. + */ + static boolean readWithExtra(ByteBuff buf, FSDataInputStream dis, int necessaryLen, int extraLen) + throws IOException { + if (!isByteBufferReadable(dis)) { + // If InputStream does not support the ByteBuffer read, just read to heap and copy bytes to + // the destination ByteBuff. + byte[] heapBuf = new byte[necessaryLen + extraLen]; + boolean ret = readWithExtraOnHeap(dis, heapBuf, 0, necessaryLen, extraLen); + copyToByteBuff(heapBuf, 0, heapBuf.length, buf); + return ret; + } + ByteBuffer[] buffers = buf.nioByteBuffers(); + int bytesRead = 0; + int remain = necessaryLen + extraLen; + int idx = 0; + ByteBuffer cur = buffers[idx]; + while (bytesRead < necessaryLen) { + while (!cur.hasRemaining()) { + if (++idx >= buffers.length) { + throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); + } + cur = buffers[idx]; + } + cur.limit(cur.position() + Math.min(remain, cur.remaining())); + int ret = dis.read(cur); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream (read returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, successfully read " + bytesRead); + } + bytesRead += ret; + remain -= ret; + } + return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); + } + + /** + * Read from an input stream at least <code>necessaryLen</code> and if possible, + * <code>extraLen</code> also if available. Analogous to + * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses positional read and + * specifies a number of "extra" bytes that would be desirable but not absolutely necessary to + * read. + * @param buff ByteBuff to read into. + * @param dis the input stream to read from + * @param position the position within the stream from which to start reading + * @param necessaryLen the number of bytes that are absolutely necessary to read + * @param extraLen the number of extra bytes that would be nice to read + * @return true if and only if extraLen is > 0 and reading those extra bytes was successful + * @throws IOException if failed to read the necessary bytes + */ + static boolean preadWithExtra(ByteBuff buff, FSDataInputStream dis, long position, + int necessaryLen, int extraLen) throws IOException { + int remain = necessaryLen + extraLen; + byte[] buf = new byte[remain]; + int bytesRead = 0; + while (bytesRead < necessaryLen) { + int ret = dis.read(position + bytesRead, buf, bytesRead, remain); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream (positional read returned " + ret + + ", was trying to read " + necessaryLen + " necessary bytes and " + extraLen + + " extra bytes, successfully read " + bytesRead); + } + bytesRead += ret; + remain -= ret; + } + // Copy the bytes from on-heap bytes[] to ByteBuffer[] now, and after resolving HDFS-3246, we + // will read the bytes to ByteBuffer[] directly without allocating any on-heap byte[]. + // TODO I keep the bytes copy here, because I want to abstract the ByteBuffer[] + // preadWithExtra method for the upper layer, only need to refactor this method if the + // ByteBuffer pread is OK. + copyToByteBuff(buf, 0, bytesRead, buff); + return (extraLen > 0) && (bytesRead == necessaryLen + extraLen); + } + + private static int copyToByteBuff(byte[] buf, int offset, int len, ByteBuff out) + throws IOException { + if (offset < 0 || len < 0 || offset + len > buf.length) { + throw new IOException("Invalid offset=" + offset + " and len=" + len + ", cap=" + buf.length); + } + ByteBuffer[] buffers = out.nioByteBuffers(); + int idx = 0, remain = len, copyLen; + ByteBuffer cur = buffers[idx]; + while (remain > 0) { + while (!cur.hasRemaining()) { + if (++idx >= buffers.length) { + throw new IOException("Not enough ByteBuffers to read the reminding " + remain + "bytes"); + } + cur = buffers[idx]; + } + copyLen = Math.min(cur.remaining(), remain); + cur.put(buf, offset, copyLen); + remain -= copyLen; + offset += copyLen; + } + return len; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 91e63fd..4773678 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -21,7 +21,6 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; @@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.io.IOUtils; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; @@ -280,9 +278,7 @@ public class HFileBlock implements Cacheable { boolean usesChecksum = buf.get() == (byte) 1; long offset = buf.getLong(); int nextBlockOnDiskSize = buf.getInt(); - HFileBlock hFileBlock = - new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null); - return hFileBlock; + return new HFileBlock(newByteBuff, usesChecksum, memType, offset, nextBlockOnDiskSize, null); } @Override @@ -315,9 +311,9 @@ public class HFileBlock implements Cacheable { * param. */ private HFileBlock(HFileBlock that, boolean bufCopy) { - init(that.blockType, that.onDiskSizeWithoutHeader, - that.uncompressedSizeWithoutHeader, that.prevBlockOffset, - that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize, that.fileContext); + init(that.blockType, that.onDiskSizeWithoutHeader, that.uncompressedSizeWithoutHeader, + that.prevBlockOffset, that.offset, that.onDiskDataSizeWithHeader, that.nextBlockOnDiskSize, + that.fileContext); if (bufCopy) { this.buf = new SingleByteBuff(ByteBuffer.wrap(that.buf.toBytes(0, that.buf.limit()))); } else { @@ -331,6 +327,7 @@ public class HFileBlock implements Cacheable { * and is sitting in a byte buffer and we want to stuff the block into cache. * * <p>TODO: The caller presumes no checksumming + * <p>TODO: HFile block writer can also off-heap ? </p> * required of this block instance since going into cache; checksum already verified on * underlying block data pulled in from filesystem. Is that correct? What if cache is SSD? * @@ -349,8 +346,8 @@ public class HFileBlock implements Cacheable { int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer b, boolean fillHeader, long offset, final int nextBlockOnDiskSize, int onDiskDataSizeWithHeader, HFileContext fileContext) { - init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, - prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); + init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset, + onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); this.buf = new SingleByteBuff(b); if (fillHeader) { overwriteHeader(); @@ -366,7 +363,8 @@ public class HFileBlock implements Cacheable { * @param buf Has header, content, and trailing checksums if present. */ HFileBlock(ByteBuff buf, boolean usesHBaseChecksum, MemoryType memType, final long offset, - final int nextBlockOnDiskSize, HFileContext fileContext) throws IOException { + final int nextBlockOnDiskSize, HFileContext fileContext) + throws IOException { buf.rewind(); final BlockType blockType = BlockType.read(buf); final int onDiskSizeWithoutHeader = buf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX); @@ -394,8 +392,8 @@ public class HFileBlock implements Cacheable { } fileContext = fileContextBuilder.build(); assert usesHBaseChecksum == fileContext.isUseHBaseChecksum(); - init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, - prevBlockOffset, offset, onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); + init(blockType, onDiskSizeWithoutHeader, uncompressedSizeWithoutHeader, prevBlockOffset, offset, + onDiskDataSizeWithHeader, nextBlockOnDiskSize, fileContext); this.memType = memType; this.offset = offset; this.buf = buf; @@ -406,9 +404,8 @@ public class HFileBlock implements Cacheable { * Called from constructors. */ private void init(BlockType blockType, int onDiskSizeWithoutHeader, - int uncompressedSizeWithoutHeader, long prevBlockOffset, - long offset, int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, - HFileContext fileContext) { + int uncompressedSizeWithoutHeader, long prevBlockOffset, long offset, + int onDiskDataSizeWithHeader, final int nextBlockOnDiskSize, HFileContext fileContext) { this.blockType = blockType; this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; this.uncompressedSizeWithoutHeader = uncompressedSizeWithoutHeader; @@ -425,10 +422,9 @@ public class HFileBlock implements Cacheable { * @param verifyChecksum true if checksum verification is in use. * @return Size of the block with header included. */ - private static int getOnDiskSizeWithHeader(final ByteBuffer headerBuf, + private static int getOnDiskSizeWithHeader(final ByteBuff headerBuf, boolean verifyChecksum) { - return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + - headerSize(verifyChecksum); + return headerBuf.getInt(Header.ON_DISK_SIZE_WITHOUT_HEADER_INDEX) + headerSize(verifyChecksum); } /** @@ -651,9 +647,10 @@ public class HFileBlock implements Cacheable { ByteBuff dup = this.buf.duplicate(); dup.position(this.headerSize()); dup = dup.slice(); + ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), - unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), - dup); + unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), dup); + return unpacked; } @@ -667,15 +664,14 @@ public class HFileBlock implements Cacheable { int headerSize = headerSize(); int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + cksumBytes; - // TODO we need consider allocating offheap here? - ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); + ByteBuff newBuf = new SingleByteBuff(ByteBuffer.allocate(capacityNeeded)); // Copy header bytes into newBuf. // newBuf is HBB so no issue in calling array() buf.position(0); - buf.get(newBuf.array(), newBuf.arrayOffset(), headerSize); + newBuf.put(0, buf, 0, headerSize); - buf = new SingleByteBuff(newBuf); + buf = newBuf; // set limit to exclude next block's header buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); } @@ -692,17 +688,6 @@ public class HFileBlock implements Cacheable { return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; } - /** An additional sanity-check in case no compression or encryption is being used. */ - @VisibleForTesting - void sanityCheckUncompressedSize() throws IOException { - if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { - throw new IOException("Using no compression but " - + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " - + "uncompressedSizeWithoutHeader=" + uncompressedSizeWithoutHeader - + ", numChecksumbytes=" + totalChecksumBytes()); - } - } - /** * Cannot be {@link #UNSET}. Must be a legitimate value. Used re-making the {@link BlockCacheKey} when * block is returned to the cache. @@ -748,82 +733,6 @@ public class HFileBlock implements Cacheable { } /** - * Read from an input stream at least <code>necessaryLen</code> and if possible, - * <code>extraLen</code> also if available. Analogous to - * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but specifies a - * number of "extra" bytes to also optionally read. - * - * @param in the input stream to read from - * @param buf the buffer to read into - * @param bufOffset the destination offset in the buffer - * @param necessaryLen the number of bytes that are absolutely necessary to read - * @param extraLen the number of extra bytes that would be nice to read - * @return true if succeeded reading the extra bytes - * @throws IOException if failed to read the necessary bytes - */ - static boolean readWithExtra(InputStream in, byte[] buf, - int bufOffset, int necessaryLen, int extraLen) throws IOException { - int bytesRemaining = necessaryLen + extraLen; - while (bytesRemaining > 0) { - int ret = in.read(buf, bufOffset, bytesRemaining); - if (ret == -1 && bytesRemaining <= extraLen) { - // We could not read the "extra data", but that is OK. - break; - } - if (ret < 0) { - throw new IOException("Premature EOF from inputStream (read " - + "returned " + ret + ", was trying to read " + necessaryLen - + " necessary bytes and " + extraLen + " extra bytes, " - + "successfully read " - + (necessaryLen + extraLen - bytesRemaining)); - } - bufOffset += ret; - bytesRemaining -= ret; - } - return bytesRemaining <= 0; - } - - /** - * Read from an input stream at least <code>necessaryLen</code> and if possible, - * <code>extraLen</code> also if available. Analogous to - * {@link IOUtils#readFully(InputStream, byte[], int, int)}, but uses - * positional read and specifies a number of "extra" bytes that would be - * desirable but not absolutely necessary to read. - * - * @param in the input stream to read from - * @param position the position within the stream from which to start reading - * @param buf the buffer to read into - * @param bufOffset the destination offset in the buffer - * @param necessaryLen the number of bytes that are absolutely necessary to - * read - * @param extraLen the number of extra bytes that would be nice to read - * @return true if and only if extraLen is > 0 and reading those extra bytes - * was successful - * @throws IOException if failed to read the necessary bytes - */ - @VisibleForTesting - static boolean positionalReadWithExtra(FSDataInputStream in, - long position, byte[] buf, int bufOffset, int necessaryLen, int extraLen) - throws IOException { - int bytesRemaining = necessaryLen + extraLen; - int bytesRead = 0; - while (bytesRead < necessaryLen) { - int ret = in.read(position, buf, bufOffset, bytesRemaining); - if (ret < 0) { - throw new IOException("Premature EOF from inputStream (positional read " - + "returned " + ret + ", was trying to read " + necessaryLen - + " necessary bytes and " + extraLen + " extra bytes, " - + "successfully read " + bytesRead); - } - position += ret; - bufOffset += ret; - bytesRemaining -= ret; - bytesRead += ret; - } - return bytesRead != necessaryLen && bytesRemaining <= 0; - } - - /** * Unified version 2 {@link HFile} block writer. The intended usage pattern * is as follows: * <ol> @@ -988,18 +897,6 @@ public class HFileBlock implements Cacheable { } /** - * Returns the stream for the user to write to. The block writer takes care - * of handling compression and buffering for caching on write. Can only be - * called in the "writing" state. - * - * @return the data output stream for the user to write to - */ - DataOutputStream getUserDataStream() { - expectState(State.WRITING); - return userDataStream; - } - - /** * Transitions the block writer from the "writing" state to the "block * ready" state. Does nothing if a block is already finished. */ @@ -1261,11 +1158,9 @@ public class HFileBlock implements Cacheable { } /** - * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is - * needed for storing packed blocks in the block cache. Expects calling semantics identical to - * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, - * Does not include checksum data. - * + * Clones the header followed by the on-disk (compressed/encoded/encrypted) data. This is needed + * for storing packed blocks in the block cache. Returns only the header and data, Does not + * include checksum data. * @return Returns a copy of block bytes for caching on write */ private ByteBuffer cloneOnDiskBufferWithHeader() { @@ -1321,11 +1216,10 @@ public class HFileBlock implements Cacheable { .withIncludesMvcc(fileContext.isIncludesMvcc()) .withIncludesTags(fileContext.isIncludesTags()) .build(); - return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), + return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), getUncompressedSizeWithoutHeader(), prevOffset, - cacheConf.shouldCacheCompressed(blockType.getCategory())? - cloneOnDiskBufferWithHeader() : - cloneUncompressedBufferWithHeader(), + cacheConf.shouldCacheCompressed(blockType.getCategory()) ? cloneOnDiskBufferWithHeader() + : cloneUncompressedBufferWithHeader(), FILL_HEADER, startOffset, UNSET, onDiskBlockBytesWithHeader.size() + onDiskChecksum.length, newContext); } @@ -1415,8 +1309,8 @@ public class HFileBlock implements Cacheable { */ private static class PrefetchedHeader { long offset = -1; - byte [] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; - final ByteBuffer buf = ByteBuffer.wrap(header, 0, HConstants.HFILEBLOCK_HEADER_SIZE); + byte[] header = new byte[HConstants.HFILEBLOCK_HEADER_SIZE]; + final ByteBuff buf = new SingleByteBuff(ByteBuffer.wrap(header, 0, header.length)); @Override public String toString() { @@ -1479,11 +1373,11 @@ public class HFileBlock implements Cacheable { } /** - * A constructor that reads files with the latest minor version. - * This is used by unit tests only. + * A constructor that reads files with the latest minor version. This is used by unit tests + * only. */ FSReaderImpl(FSDataInputStream istream, long fileSize, HFileContext fileContext) - throws IOException { + throws IOException { this(new FSDataInputStreamWrapper(istream), fileSize, null, null, fileContext); } @@ -1520,60 +1414,49 @@ public class HFileBlock implements Cacheable { } /** - * Does a positional read or a seek and read into the given buffer. Returns - * the on-disk size of the next block, or -1 if it could not be read/determined; e.g. EOF. - * + * Does a positional read or a seek and read into the given byte buffer. We need take care that + * we will call the {@link ByteBuff#release()} for every exit to deallocate the ByteBuffers, + * otherwise the memory leak may happen. * @param dest destination buffer - * @param destOffset offset into the destination buffer at where to put the bytes we read * @param size size of read * @param peekIntoNextBlock whether to read the next block's on-disk size * @param fileOffset position in the stream to read at * @param pread whether we should do a positional read * @param istream The input source of data - * @return the on-disk size of the next block with header size included, or - * -1 if it could not be determined; if not -1, the <code>dest</code> INCLUDES the - * next header - * @throws IOException + * @return true to indicate the destination buffer include the next block header, otherwise only + * include the current block data without the next block header. + * @throws IOException if any IO error happen. */ - @VisibleForTesting - protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size, - boolean peekIntoNextBlock, long fileOffset, boolean pread) - throws IOException { - if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) { - // We are asked to read the next block's header as well, but there is - // not enough room in the array. - throw new IOException("Attempted to read " + size + " bytes and " + hdrSize + - " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset); - } - + protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, + boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { if (!pread) { // Seek + read. Better for scanning. HFileUtil.seekOnMultipleSources(istream, fileOffset); - // TODO: do we need seek time latencies? long realOffset = istream.getPos(); if (realOffset != fileOffset) { - throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size + - " bytes, but pos=" + realOffset + " after seek"); + throw new IOException("Tried to seek to " + fileOffset + " to read " + size + + " bytes, but pos=" + realOffset + " after seek"); } - if (!peekIntoNextBlock) { - IOUtils.readFully(istream, dest, destOffset, size); - return -1; + BlockIOUtils.readFully(dest, istream, size); + return false; } - // Try to read the next block header. - if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) { - return -1; + // Try to read the next block header + if (!BlockIOUtils.readWithExtra(dest, istream, size, hdrSize)) { + // did not read the next block header. + return false; } } else { // Positional read. Better for random reads; or when the streamLock is already locked. int extraSize = peekIntoNextBlock ? hdrSize : 0; - if (!positionalReadWithExtra(istream, fileOffset, dest, destOffset, size, extraSize)) { - return -1; + if (!BlockIOUtils.preadWithExtra(dest, istream, fileOffset, size, extraSize)) { + // did not read the next block header. + return false; } } assert peekIntoNextBlock; - return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; + return true; } /** @@ -1672,7 +1555,7 @@ public class HFileBlock implements Cacheable { * is not right. * @throws IOException */ - private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuffer headerBuf, + private void verifyOnDiskSizeMatchesHeader(final int passedIn, final ByteBuff headerBuf, final long offset, boolean verifyChecksum) throws IOException { // Assert size provided aligns with what is in the header @@ -1691,11 +1574,11 @@ public class HFileBlock implements Cacheable { * we have to backup the stream because we over-read (the next block's header). * @see PrefetchedHeader * @return The cached block header or null if not found. - * @see #cacheNextBlockHeader(long, byte[], int, int) + * @see #cacheNextBlockHeader(long, ByteBuff, int, int) */ - private ByteBuffer getCachedHeader(final long offset) { + private ByteBuff getCachedHeader(final long offset) { PrefetchedHeader ph = this.prefetchedHeader.get(); - return ph != null && ph.offset == offset? ph.buf: null; + return ph != null && ph.offset == offset ? ph.buf : null; } /** @@ -1704,13 +1587,24 @@ public class HFileBlock implements Cacheable { * @see PrefetchedHeader */ private void cacheNextBlockHeader(final long offset, - final byte [] header, final int headerOffset, final int headerLength) { + ByteBuff onDiskBlock, int onDiskSizeWithHeader, int headerLength) { PrefetchedHeader ph = new PrefetchedHeader(); ph.offset = offset; - System.arraycopy(header, headerOffset, ph.header, 0, headerLength); + onDiskBlock.get(onDiskSizeWithHeader, ph.header, 0, headerLength); this.prefetchedHeader.set(ph); } + private int getNextBlockOnDiskSize(boolean readNextHeader, ByteBuff onDiskBlock, + int onDiskSizeWithHeader) { + int nextBlockOnDiskSize = -1; + if (readNextHeader) { + nextBlockOnDiskSize = + onDiskBlock.getIntAfterPosition(onDiskSizeWithHeader + BlockType.MAGIC_LENGTH) + + hdrSize; + } + return nextBlockOnDiskSize; + } + /** * Reads a version 2 block. * @@ -1737,7 +1631,7 @@ public class HFileBlock implements Cacheable { // Try and get cached header. Will serve us in rare case where onDiskSizeWithHeaderL is -1 // and will save us having to seek the stream backwards to reread the header we // read the last time through here. - ByteBuffer headerBuf = getCachedHeader(offset); + ByteBuff headerBuf = getCachedHeader(offset); LOG.trace("Reading {} at offset={}, pread={}, verifyChecksum={}, cachedHeader={}, " + "onDiskSizeWithHeader={}", this.fileContext.getHFileName(), offset, pread, verifyChecksum, headerBuf, onDiskSizeWithHeader); @@ -1757,9 +1651,9 @@ public class HFileBlock implements Cacheable { if (LOG.isTraceEnabled()) { LOG.trace("Extra see to get block size!", new RuntimeException()); } - headerBuf = ByteBuffer.allocate(hdrSize); - readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, - offset, pread); + headerBuf = new SingleByteBuff(ByteBuffer.allocate(hdrSize)); + readAtOffset(is, headerBuf, hdrSize, false, offset, pread); + headerBuf.rewind(); } onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, checksumSupport); } @@ -1770,46 +1664,55 @@ public class HFileBlock implements Cacheable { // says where to start reading. If we have the header cached, then we don't need to read // it again and we can likely read from last place we left off w/o need to backup and reread // the header we read last time through here. - // TODO: Make this ByteBuffer-based. Will make it easier to go to HDFS with BBPool (offheap). - byte [] onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; - int nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, + ByteBuff onDiskBlock = + new SingleByteBuff(ByteBuffer.allocate(onDiskSizeWithHeader + hdrSize)); + boolean initHFileBlockSuccess = false; + try { + if (headerBuf != null) { + onDiskBlock.put(0, headerBuf, 0, hdrSize).position(hdrSize); + } + boolean readNextHeader = readAtOffset(is, onDiskBlock, onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); - if (headerBuf != null) { - // The header has been read when reading the previous block OR in a distinct header-only - // read. Copy to this block's header. - System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); - } else { - headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); - } - // Do a few checks before we go instantiate HFileBlock. - assert onDiskSizeWithHeader > this.hdrSize; - verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); - ByteBuff onDiskBlockByteBuff = - new SingleByteBuff(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader)); - // Verify checksum of the data before using it for building HFileBlock. - if (verifyChecksum && !validateChecksum(offset, onDiskBlockByteBuff, hdrSize)) { - return null; - } - long duration = System.currentTimeMillis() - startTime; - if (updateMetrics) { - HFile.updateReadLatency(duration, pread); - } - // The onDiskBlock will become the headerAndDataBuffer for this block. - // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already - // contains the header of next block, so no need to set next block's header in it. - HFileBlock hFileBlock = new HFileBlock(onDiskBlockByteBuff, checksumSupport, - MemoryType.EXCLUSIVE, offset, nextBlockOnDiskSize, fileContext); - // Run check on uncompressed sizings. - if (!fileContext.isCompressedOrEncrypted()) { - hFileBlock.sanityCheckUncompressed(); - } - LOG.trace("Read {} in {} ns", hFileBlock, duration); - // Cache next block header if we read it for the next time through here. - if (nextBlockOnDiskSize != -1) { - cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), - onDiskBlock, onDiskSizeWithHeader, hdrSize); + onDiskBlock.rewind(); // in case of moving position when copying a cached header + int nextBlockOnDiskSize = + getNextBlockOnDiskSize(readNextHeader, onDiskBlock, onDiskSizeWithHeader); + if (headerBuf == null) { + headerBuf = onDiskBlock.duplicate().position(0).limit(hdrSize); + } + // Do a few checks before we go instantiate HFileBlock. + assert onDiskSizeWithHeader > this.hdrSize; + verifyOnDiskSizeMatchesHeader(onDiskSizeWithHeader, headerBuf, offset, checksumSupport); + ByteBuff curBlock = onDiskBlock.duplicate().limit(onDiskSizeWithHeader); + // Verify checksum of the data before using it for building HFileBlock. + if (verifyChecksum && !validateChecksum(offset, curBlock, hdrSize)) { + return null; + } + long duration = System.currentTimeMillis() - startTime; + if (updateMetrics) { + HFile.updateReadLatency(duration, pread); + } + // The onDiskBlock will become the headerAndDataBuffer for this block. + // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already + // contains the header of next block, so no need to set next block's header in it. + HFileBlock hFileBlock = new HFileBlock(curBlock, checksumSupport, MemoryType.EXCLUSIVE, + offset, nextBlockOnDiskSize, fileContext); + // Run check on uncompressed sizings. + if (!fileContext.isCompressedOrEncrypted()) { + hFileBlock.sanityCheckUncompressed(); + } + LOG.trace("Read {} in {} ns", hFileBlock, duration); + // Cache next block header if we read it for the next time through here. + if (nextBlockOnDiskSize != -1) { + cacheNextBlockHeader(offset + hFileBlock.getOnDiskSizeWithHeader(), onDiskBlock, + onDiskSizeWithHeader, hdrSize); + } + initHFileBlockSuccess = true; + return hFileBlock; + } finally { + if (!initHFileBlockSuccess) { + onDiskBlock.release(); + } } - return hFileBlock; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockPositionalRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java similarity index 54% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockPositionalRead.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java index a13c868..60180e6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockPositionalRead.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockIOUtils.java @@ -17,33 +17,115 @@ */ package org.apache.hadoop.hbase.io.hfile; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import java.io.IOException; +import java.nio.ByteBuffer; + import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.MultiByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; -/** - * Unit test suite covering HFileBlock positional read logic. - */ -@Category({IOTests.class, SmallTests.class}) -public class TestHFileBlockPositionalRead { +@Category({ IOTests.class, SmallTests.class }) +public class TestBlockIOUtils { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestHFileBlockPositionalRead.class); + HBaseClassTestRule.forClass(TestBlockIOUtils.class); @Rule public ExpectedException exception = ExpectedException.none(); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Test + public void testIsByteBufferReadable() throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testIsByteBufferReadable"); + try (FSDataOutputStream out = fs.create(p)) { + out.writeInt(23); + } + try (FSDataInputStream is = fs.open(p)) { + assertFalse(BlockIOUtils.isByteBufferReadable(is)); + } + } + + @Test + public void testReadFully() throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadFully"); + String s = "hello world"; + try (FSDataOutputStream out = fs.create(p)) { + out.writeBytes(s); + } + ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(11)); + try (FSDataInputStream in = fs.open(p)) { + BlockIOUtils.readFully(buf, in, 11); + } + buf.rewind(); + byte[] heapBuf = new byte[s.length()]; + buf.get(heapBuf, 0, heapBuf.length); + assertArrayEquals(Bytes.toBytes(s), heapBuf); + } + + @Test + public void testReadWithExtra() throws IOException { + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path p = new Path(TEST_UTIL.getDataTestDirOnTestFS(), "testReadWithExtra"); + String s = "hello world"; + try (FSDataOutputStream out = fs.create(p)) { + out.writeBytes(s); + } + ByteBuff buf = new SingleByteBuff(ByteBuffer.allocate(8)); + try (FSDataInputStream in = fs.open(p)) { + assertTrue(BlockIOUtils.readWithExtra(buf, in, 6, 2)); + } + buf.rewind(); + byte[] heapBuf = new byte[buf.capacity()]; + buf.get(heapBuf, 0, heapBuf.length); + assertArrayEquals(Bytes.toBytes("hello wo"), heapBuf); + + buf = new MultiByteBuff(ByteBuffer.allocate(4), ByteBuffer.allocate(4), ByteBuffer.allocate(4)); + try (FSDataInputStream in = fs.open(p)) { + assertTrue(BlockIOUtils.readWithExtra(buf, in, 8, 3)); + } + buf.rewind(); + heapBuf = new byte[11]; + buf.get(heapBuf, 0, heapBuf.length); + assertArrayEquals(Bytes.toBytes("hello world"), heapBuf); + + buf.position(0).limit(12); + try (FSDataInputStream in = fs.open(p)) { + try { + BlockIOUtils.readWithExtra(buf, in, 12, 0); + fail("Should only read 11 bytes"); + } catch (IOException e) { + + } + } + } + @Test public void testPositionalReadNoExtra() throws IOException { long position = 0; @@ -52,10 +134,10 @@ public class TestHFileBlockPositionalRead { int extraLen = 0; int totalLen = necessaryLen + extraLen; byte[] buf = new byte[totalLen]; + ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); - boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf, - bufOffset, necessaryLen, extraLen); + boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); assertFalse("Expect false return when no extra bytes requested", ret); verify(in).read(position, buf, bufOffset, totalLen); verifyNoMoreInteractions(in); @@ -69,11 +151,11 @@ public class TestHFileBlockPositionalRead { int extraLen = 0; int totalLen = necessaryLen + extraLen; byte[] buf = new byte[totalLen]; + ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(5, buf, 5, 5)).thenReturn(5); - boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf, - bufOffset, necessaryLen, extraLen); + boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); assertFalse("Expect false return when no extra bytes requested", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(5, buf, 5, 5); @@ -88,10 +170,10 @@ public class TestHFileBlockPositionalRead { int extraLen = 5; int totalLen = necessaryLen + extraLen; byte[] buf = new byte[totalLen]; + ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(totalLen); - boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf, - bufOffset, necessaryLen, extraLen); + boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); assertTrue("Expect true return when reading extra bytes succeeds", ret); verify(in).read(position, buf, bufOffset, totalLen); verifyNoMoreInteractions(in); @@ -105,10 +187,10 @@ public class TestHFileBlockPositionalRead { int extraLen = 5; int totalLen = necessaryLen + extraLen; byte[] buf = new byte[totalLen]; + ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(necessaryLen); - boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf, - bufOffset, necessaryLen, extraLen); + boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); assertFalse("Expect false return when reading extra bytes fails", ret); verify(in).read(position, buf, bufOffset, totalLen); verifyNoMoreInteractions(in); @@ -123,11 +205,11 @@ public class TestHFileBlockPositionalRead { int extraLen = 5; int totalLen = necessaryLen + extraLen; byte[] buf = new byte[totalLen]; + ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(5); when(in.read(5, buf, 5, 10)).thenReturn(10); - boolean ret = HFileBlock.positionalReadWithExtra(in, position, buf, - bufOffset, necessaryLen, extraLen); + boolean ret = BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); assertTrue("Expect true return when reading extra bytes succeeds", ret); verify(in).read(position, buf, bufOffset, totalLen); verify(in).read(5, buf, 5, 10); @@ -142,12 +224,12 @@ public class TestHFileBlockPositionalRead { int extraLen = 0; int totalLen = necessaryLen + extraLen; byte[] buf = new byte[totalLen]; + ByteBuff bb = new SingleByteBuff(ByteBuffer.wrap(buf, 0, totalLen)); FSDataInputStream in = mock(FSDataInputStream.class); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(9); when(in.read(position, buf, bufOffset, totalLen)).thenReturn(-1); exception.expect(IOException.class); exception.expectMessage("EOF"); - HFileBlock.positionalReadWithExtra(in, position, buf, bufOffset, - necessaryLen, extraLen); + BlockIOUtils.preadWithExtra(bb, in, position, necessaryLen, extraLen); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index e93b61e..a4135d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -398,23 +398,25 @@ public class TestChecksum { return b; } + @Override - protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size, + protected boolean readAtOffset(FSDataInputStream istream, ByteBuff dest, int size, boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { - int returnValue = super.readAtOffset(istream, dest, destOffset, size, peekIntoNextBlock, - fileOffset, pread); + int destOffset = dest.position(); + boolean returnValue = + super.readAtOffset(istream, dest, size, peekIntoNextBlock, fileOffset, pread); if (!corruptDataStream) { return returnValue; } // Corrupt 3rd character of block magic of next block's header. if (peekIntoNextBlock) { - dest[destOffset + size + 3] = 0b00000000; + dest.put(destOffset + size + 3, (byte) 0b00000000); } // We might be reading this block's header too, corrupt it. - dest[destOffset + 1] = 0b00000000; + dest.put(destOffset + 1, (byte) 0b00000000); // Corrupt non header data if (size > hdrSize) { - dest[destOffset + hdrSize + 1] = 0b00000000; + dest.put(destOffset + hdrSize + 1, (byte) 0b00000000); } return returnValue; }
