HBASE-11331 [blockcache] lazy block decompression When hbase.block.data.cachecompressed=true, DATA (and ENCODED_DATA) blocks are cached in the BlockCache in their on-disk format. This is different from the default behavior, which decompresses and decrypts a block before caching.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/eec15bd1 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/eec15bd1 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/eec15bd1 Branch: refs/heads/master Commit: eec15bd17218a2543c9f4cbb9a78841a9fdec043 Parents: 7066de6 Author: Nick Dimiduk <[email protected]> Authored: Tue Sep 9 15:50:53 2014 -0700 Committer: Nick Dimiduk <[email protected]> Committed: Wed Sep 10 15:22:13 2014 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/io/hfile/HFileContext.java | 17 + .../tmpl/regionserver/BlockCacheTmpl.jamon | 6 +- .../hadoop/hbase/io/hfile/CacheConfig.java | 66 +-- .../hadoop/hbase/io/hfile/HFileBlock.java | 441 +++++++++++-------- .../hadoop/hbase/io/hfile/HFileBlockIndex.java | 26 +- .../hadoop/hbase/io/hfile/HFileReaderV2.java | 28 +- .../hadoop/hbase/io/hfile/HFileWriterV2.java | 6 +- .../hadoop/hbase/io/hfile/LruBlockCache.java | 100 ++++- .../hadoop/hbase/io/hfile/TestCacheConfig.java | 3 +- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 60 ++- .../hadoop/hbase/io/hfile/TestChecksum.java | 10 +- .../io/hfile/TestForceCacheImportantBlocks.java | 19 +- .../apache/hadoop/hbase/io/hfile/TestHFile.java | 3 +- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 92 ++-- .../io/hfile/TestHFileBlockCompatibility.java | 18 +- .../hbase/io/hfile/TestHFileEncryption.java | 13 +- .../hbase/io/hfile/TestHFileWriterV2.java | 12 +- .../hbase/io/hfile/TestHFileWriterV3.java | 13 +- .../hfile/TestLazyDataBlockDecompression.java | 231 ++++++++++ 19 files changed, 834 insertions(+), 330 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 3299e41..8cfc8a7 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -96,6 +96,23 @@ public class HFileContext implements HeapSize, Cloneable { this.cryptoContext = cryptoContext; } + /** + * @return true when on-disk blocks from this file are compressed, and/or encrypted; + * false otherwise. + */ + public boolean isCompressedOrEncrypted() { + Compression.Algorithm compressAlgo = getCompression(); + boolean compressed = + compressAlgo != null + && compressAlgo != Compression.Algorithm.NONE; + + Encryption.Context cryptoContext = getEncryptionContext(); + boolean encrypted = cryptoContext != null + && cryptoContext != Encryption.Context.NONE; + + return compressed || encrypted; + } + public Compression.Algorithm getCompression() { return compressAlgo; } http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon index 162ac46..a6a3cf9 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon @@ -157,9 +157,9 @@ org.apache.hadoop.util.StringUtils; reader is closed</td> </tr> <tr> - <td>Compress blocks</td> - <td><% cacheConfig.shouldCacheCompressed() %></td> - <td>True if blocks are compressed in cache</td> + <td>Cache DATA in compressed format</td> + <td><% cacheConfig.shouldCacheDataCompressed() %></td> + <td>True if DATA blocks are cached in their compressed form</td> </tr> <tr> <td>Prefetch on Open</td> http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 82bbeee..3ffac52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -64,11 +64,10 @@ public class CacheConfig { "hfile.block.bloom.cacheonwrite"; /** - * TODO: Implement this (jgray) - * Configuration key to cache data blocks in compressed format. + * Configuration key to cache data blocks in compressed and/or encrypted format. */ public static final String CACHE_DATA_BLOCKS_COMPRESSED_KEY = - "hbase.rs.blockcache.cachedatacompressed"; + "hbase.block.data.cachecompressed"; /** * Configuration key to evict all blocks of a given file from the block cache @@ -119,6 +118,14 @@ public class CacheConfig { public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen"; + /** + * The target block size used by blockcache instances. Defaults to + * {@link HConstants#DEFAULT_BLOCKSIZE}. + * TODO: this config point is completely wrong, as it's used to determine the + * target block size of BlockCache instances. Rename. + */ + public static final String BLOCKCACHE_BLOCKSIZE_KEY = "hbase.offheapcache.minblocksize"; + // Defaults public static final boolean DEFAULT_CACHE_DATA_ON_READ = true; @@ -127,7 +134,7 @@ public class CacheConfig { public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false; public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false; public static final boolean DEFAULT_EVICT_ON_CLOSE = false; - public static final boolean DEFAULT_COMPRESSED_CACHE = false; + public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false; public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; /** Local reference to the block cache, null if completely disabled */ @@ -156,8 +163,8 @@ public class CacheConfig { /** Whether blocks of a file should be evicted when the file is closed */ private boolean evictOnClose; - /** Whether data blocks should be stored in compressed form in the cache */ - private final boolean cacheCompressed; + /** Whether data blocks should be stored in compressed and/or encrypted form in the cache */ + private final boolean cacheDataCompressed; /** Whether data blocks should be prefetched into the cache */ private final boolean prefetchOnOpen; @@ -189,7 +196,7 @@ public class CacheConfig { DEFAULT_CACHE_BLOOMS_ON_WRITE) || family.shouldCacheBloomsOnWrite(), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen(), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, @@ -208,13 +215,10 @@ public class CacheConfig { DEFAULT_IN_MEMORY, // This is a family-level setting so can't be set // strictly from conf conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_DATA_ON_WRITE), - conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_INDEXES_ON_WRITE), - conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, - DEFAULT_CACHE_BLOOMS_ON_WRITE), + conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_INDEXES_ON_WRITE), + conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE), conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE), - conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, - DEFAULT_COMPRESSED_CACHE), + conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) @@ -232,7 +236,7 @@ public class CacheConfig { * @param cacheIndexesOnWrite whether index blocks should be cached on write * @param cacheBloomsOnWrite whether blooms should be cached on write * @param evictOnClose whether blocks should be evicted when HFile is closed - * @param cacheCompressed whether to store blocks as compressed in the cache + * @param cacheDataCompressed whether to store blocks as compressed in the cache * @param prefetchOnOpen whether to prefetch blocks upon open * @param cacheDataInL1 If more than one cache tier deployed, if true, cache this column families * data blocks up in the L1 tier. @@ -241,7 +245,7 @@ public class CacheConfig { final boolean cacheDataOnRead, final boolean inMemory, final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, - final boolean cacheCompressed, final boolean prefetchOnOpen, + final boolean cacheDataCompressed, final boolean prefetchOnOpen, final boolean cacheDataInL1) { this.blockCache = blockCache; this.cacheDataOnRead = cacheDataOnRead; @@ -250,7 +254,7 @@ public class CacheConfig { this.cacheIndexesOnWrite = cacheIndexesOnWrite; this.cacheBloomsOnWrite = cacheBloomsOnWrite; this.evictOnClose = evictOnClose; - this.cacheCompressed = cacheCompressed; + this.cacheDataCompressed = cacheDataCompressed; this.prefetchOnOpen = prefetchOnOpen; this.cacheDataInL1 = cacheDataInL1; LOG.info(this); @@ -264,7 +268,7 @@ public class CacheConfig { this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory, cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, - cacheConf.cacheCompressed, cacheConf.prefetchOnOpen, + cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen, cacheConf.cacheDataInL1); } @@ -298,14 +302,13 @@ public class CacheConfig { * available. */ public boolean shouldCacheBlockOnRead(BlockCategory category) { - boolean shouldCache = isBlockCacheEnabled() + return isBlockCacheEnabled() && (cacheDataOnRead || category == BlockCategory.INDEX || category == BlockCategory.BLOOM || (prefetchOnOpen && (category != BlockCategory.META && category != BlockCategory.UNKNOWN))); - return shouldCache; } /** @@ -384,10 +387,23 @@ public class CacheConfig { } /** - * @return true if blocks should be compressed in the cache, false if not + * @return true if data blocks should be compressed in the cache, false if not */ - public boolean shouldCacheCompressed() { - return isBlockCacheEnabled() && this.cacheCompressed; + public boolean shouldCacheDataCompressed() { + return isBlockCacheEnabled() && this.cacheDataCompressed; + } + + /** + * @return true if this {@link BlockCategory} should be compressed in blockcache, false otherwise + */ + public boolean shouldCacheCompressed(BlockCategory category) { + if (!isBlockCacheEnabled()) return false; + switch (category) { + case DATA: + return this.cacheDataCompressed; + default: + return false; + } } /** @@ -408,7 +424,7 @@ public class CacheConfig { ", cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + ", cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + ", cacheEvictOnClose=" + shouldEvictOnClose() + - ", cacheCompressed=" + shouldCacheCompressed() + + ", cacheDataCompressed=" + shouldCacheDataCompressed() + ", prefetchOnOpen=" + shouldPrefetchOnOpen(); } @@ -449,7 +465,7 @@ public class CacheConfig { */ private static LruBlockCache getL1(final Configuration c, final MemoryUsage mu) { long lruCacheSize = getLruCacheSize(c, mu); - int blockSize = c.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); + int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); LOG.info("Allocating LruBlockCache size=" + StringUtils.byteDesc(lruCacheSize) + ", blockSize=" + StringUtils.byteDesc(blockSize)); return new LruBlockCache(lruCacheSize, blockSize, true, c); @@ -466,7 +482,7 @@ public class CacheConfig { String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null); if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 0) return null; - int blockSize = c.getInt("hbase.offheapcache.minblocksize", HConstants.DEFAULT_BLOCKSIZE); + int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY, HConstants.DEFAULT_BLOCKSIZE); float bucketCachePercentage = c.getFloat(BUCKET_CACHE_SIZE_KEY, 0F); long bucketCacheSize = (long) (bucketCachePercentage < 1? mu.getMax() * bucketCachePercentage: bucketCachePercentage * 1024 * 1024); http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 3e26107..75a87a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -36,9 +36,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext; import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext; @@ -64,25 +61,26 @@ import com.google.common.base.Preconditions; * information from the block index are required to read a block. * <li>In version 2 a block is structured as follows: * <ul> + * <li>header (see {@link Writer#finishBlock()}) + * <ul> * <li>Magic record identifying the block type (8 bytes) - * <li>Compressed block size, header not included (4 bytes) - * <li>Uncompressed block size, header not included (4 bytes) + * <li>Compressed block size, excluding header, including checksum (4 bytes) + * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes) * <li>The offset of the previous block of the same type (8 bytes). This is * used to be able to navigate to the previous block without going to the block - * <li>For minorVersions >=1, there is an additional 4 byte field - * bytesPerChecksum that records the number of bytes in a checksum chunk. - * <li>For minorVersions >=1, there is a 4 byte value to store the size of - * data on disk (excluding the checksums) + * <li>For minorVersions >=1, the ordinal describing checksum type (1 byte) + * <li>For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes) + * <li>For minorVersions >=1, the size of data on disk, including header, + * excluding checksums (4 bytes) + * </ul> + * </li> + * <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the + * same for all the blocks in the {@link HFile}, similarly to what was done in + * version 1. * <li>For minorVersions >=1, a series of 4 byte checksums, one each for * the number of bytes specified by bytesPerChecksum. - * index. - * <li>Compressed data (or uncompressed data if compression is disabled). The - * compression algorithm is the same for all the blocks in the {@link HFile}, - * similarly to what was done in version 1. * </ul> * </ul> - * The version 2 block representation in the block cache is the same as above, - * except that the data section is always uncompressed in the cache. */ @InterfaceAudience.Private public class HFileBlock implements Cacheable { @@ -111,7 +109,7 @@ public class HFileBlock implements Cacheable { ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false); // meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader - public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE + Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG; /** @@ -136,6 +134,9 @@ public class HFileBlock implements Cacheable { HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum); ourBuffer.offset = buf.getLong(); ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt(); + if (ourBuffer.hasNextBlockHeader()) { + ourBuffer.buf.limit(ourBuffer.buf.limit() - ourBuffer.headerSize()); + } return ourBuffer; } @@ -155,23 +156,28 @@ public class HFileBlock implements Cacheable { .registerDeserializer(blockDeserializer); } + /** Type of block. Header field 0. */ private BlockType blockType; - /** Size on disk without the header. It includes checksum data too. */ + /** Size on disk excluding header, including checksum. Header field 1. */ private int onDiskSizeWithoutHeader; - /** Size of pure data. Does not include header or checksums */ + /** Size of pure data. Does not include header or checksums. Header field 2. */ private final int uncompressedSizeWithoutHeader; - /** The offset of the previous block on disk */ + /** The offset of the previous block on disk. Header field 3. */ private final long prevBlockOffset; - /** Size on disk of header and data. Does not include checksum data */ + /** + * Size on disk of header + data. Excludes checksum. Header field 6, + * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS checksum. + */ private final int onDiskDataSizeWithHeader; /** The in-memory representation of the hfile block */ private ByteBuffer buf; - /** Meta data that holds meta information on the hfileblock**/ + + /** Meta data that holds meta information on the hfileblock */ private HFileContext fileContext; /** @@ -193,27 +199,18 @@ public class HFileBlock implements Cacheable { * and is sitting in a byte buffer. * * @param blockType the type of this block, see {@link BlockType} - * @param onDiskSizeWithoutHeader compressed size of the block if compression - * is used, otherwise uncompressed size, header size not included - * @param uncompressedSizeWithoutHeader uncompressed size of the block, - * header size not included. Equals onDiskSizeWithoutHeader if - * compression is disabled. - * @param prevBlockOffset the offset of the previous block in the - * {@link HFile} + * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader} + * @param uncompressedSizeWithoutHeader see {@link #uncompressedSizeWithoutHeader} + * @param prevBlockOffset see {@link #prevBlockOffset} * @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes) followed by * uncompressed data. This - * @param fillHeader true to fill in the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of - * the buffer based on the header fields provided + * @param fillHeader when true, parse {@code buf} and override the first 4 header fields. * @param offset the file offset the block was read from - * @param bytesPerChecksum the number of bytes per checksum chunk - * @param checksumType the checksum algorithm to use - * @param onDiskDataSizeWithHeader size of header and data on disk not - * including checksum data + * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader} * @param fileContext HFile meta data */ - HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, - int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf, - boolean fillHeader, long offset, + HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int uncompressedSizeWithoutHeader, + long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset, int onDiskDataSizeWithHeader, HFileContext fileContext) { this.blockType = blockType; this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader; @@ -225,6 +222,22 @@ public class HFileBlock implements Cacheable { this.offset = offset; this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader; this.fileContext = fileContext; + this.buf.rewind(); + } + + /** + * Copy constructor. Creates a shallow copy of {@code that}'s buffer. + */ + HFileBlock(HFileBlock that) { + this.blockType = that.blockType; + this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader; + this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader; + this.prevBlockOffset = that.prevBlockOffset; + this.buf = that.buf.duplicate(); + this.offset = that.offset; + this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader; + this.fileContext = that.fileContext; + this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader; } /** @@ -272,28 +285,21 @@ public class HFileBlock implements Cacheable { } /** - * @return the on-disk size of the block with header size included. This - * includes the header, the data and the checksum data. + * @return the on-disk size of header + data part + checksum. */ public int getOnDiskSizeWithHeader() { return onDiskSizeWithoutHeader + headerSize(); } /** - * Returns the size of the compressed part of the block in case compression - * is used, or the uncompressed size of the data part otherwise. Header size - * and checksum data size is not included. - * - * @return the on-disk size of the data part of the block, header and - * checksum not included. + * @return the on-disk size of the data part + checksum (header excluded). */ public int getOnDiskSizeWithoutHeader() { return onDiskSizeWithoutHeader; } /** - * @return the uncompressed size of the data part of the block, header not - * included + * @return the uncompressed size of data part (header and checksum excluded). */ public int getUncompressedSizeWithoutHeader() { return uncompressedSizeWithoutHeader; @@ -308,8 +314,8 @@ public class HFileBlock implements Cacheable { } /** - * Writes header fields into the first {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the - * buffer. Resets the buffer position to the end of header as side effect. + * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position + * is modified as side-effect. */ private void overwriteHeader() { buf.rewind(); @@ -320,11 +326,9 @@ public class HFileBlock implements Cacheable { } /** - * Returns a buffer that does not include the header. The array offset points - * to the start of the block data right after the header. The underlying data - * array is not copied. Checksum data is not included in the returned buffer. + * Returns a buffer that does not include the header or checksum. * - * @return the buffer with header skipped + * @return the buffer with header skipped and checksum omitted. */ public ByteBuffer getBufferWithoutHeader() { return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(), @@ -336,7 +340,7 @@ public class HFileBlock implements Cacheable { * modify the buffer object. This method has to be public because it is * used in {@link CompoundBloomFilter} to avoid object creation on every * Bloom filter lookup, but has to be used with caution. Checksum data - * is not included in the returned buffer. + * is not included in the returned buffer but header data is. * * @return the buffer of this block for read-only operations */ @@ -350,17 +354,17 @@ public class HFileBlock implements Cacheable { * not modify the buffer object. This method has to be public because it is * used in {@link BucketCache} to avoid buffer copy. * - * @return the byte buffer with header included for read-only operations + * @return the buffer with header and checksum included for read-only operations */ public ByteBuffer getBufferReadOnlyWithHeader() { return ByteBuffer.wrap(buf.array(), buf.arrayOffset(), buf.limit()).slice(); } /** - * Returns a byte buffer of this block, including header data, positioned at + * Returns a byte buffer of this block, including header data and checksum, positioned at * the beginning of header. The underlying data array is not copied. * - * @return the byte buffer with header included + * @return the byte buffer with header and checksum included */ ByteBuffer getBufferWithHeader() { ByteBuffer dupBuf = buf.duplicate(); @@ -376,22 +380,25 @@ public class HFileBlock implements Cacheable { } } + private void sanityCheckAssertion(BlockType valueFromBuf, BlockType valueFromField) + throws IOException { + if (valueFromBuf != valueFromField) { + throw new IOException("Block type stored in the buffer: " + + valueFromBuf + ", block type field: " + valueFromField); + } + } + /** * Checks if the block is internally consistent, i.e. the first - * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a valid header consistent - * with the fields. This function is primary for testing and debugging, and - * is not thread-safe, because it alters the internal buffer pointer. + * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a + * valid header consistent with the fields. Assumes a packed block structure. + * This function is primary for testing and debugging, and is not + * thread-safe, because it alters the internal buffer pointer. */ void sanityCheck() throws IOException { buf.rewind(); - { - BlockType blockTypeFromBuf = BlockType.read(buf); - if (blockTypeFromBuf != blockType) { - throw new IOException("Block type stored in the buffer: " + - blockTypeFromBuf + ", block type field: " + blockType); - } - } + sanityCheckAssertion(BlockType.read(buf), blockType); sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader, "onDiskSizeWithoutHeader"); @@ -403,45 +410,65 @@ public class HFileBlock implements Cacheable { if (this.fileContext.isUseHBaseChecksum()) { sanityCheckAssertion(buf.get(), this.fileContext.getChecksumType().getCode(), "checksumType"); sanityCheckAssertion(buf.getInt(), this.fileContext.getBytesPerChecksum(), "bytesPerChecksum"); - sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, - "onDiskDataSizeWithHeader"); + sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader, "onDiskDataSizeWithHeader"); } int cksumBytes = totalChecksumBytes(); - int hdrSize = headerSize(); - int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() + - cksumBytes; + int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes; if (buf.limit() != expectedBufLimit) { throw new AssertionError("Expected buffer limit " + expectedBufLimit + ", got " + buf.limit()); } // We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read the next - // block's, header, so there are two sensible values for buffer capacity. - int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes; - if (buf.capacity() != size && - buf.capacity() != size + hdrSize) { + // block's header, so there are two sensible values for buffer capacity. + int hdrSize = headerSize(); + if (buf.capacity() != expectedBufLimit && + buf.capacity() != expectedBufLimit + hdrSize) { throw new AssertionError("Invalid buffer capacity: " + buf.capacity() + - ", expected " + size + " or " + (size + hdrSize)); + ", expected " + expectedBufLimit + " or " + (expectedBufLimit + hdrSize)); } } @Override public String toString() { - return "blockType=" - + blockType - + ", onDiskSizeWithoutHeader=" - + onDiskSizeWithoutHeader - + ", uncompressedSizeWithoutHeader=" - + uncompressedSizeWithoutHeader - + ", prevBlockOffset=" - + prevBlockOffset - + ", dataBeginsWith=" - + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), - Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())) - + ", fileOffset=" + offset; + StringBuilder sb = new StringBuilder() + .append("HFileBlock [") + .append(" fileOffset=").append(offset) + .append(" headerSize()=").append(headerSize()) + .append(" blockType=").append(blockType) + .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader) + .append(" uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader) + .append(" prevBlockOffset=").append(prevBlockOffset) + .append(" isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum()); + if (fileContext.isUseHBaseChecksum()) { + sb.append(" checksumType=").append(ChecksumType.codeToType(this.buf.get(24))) + .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1)) + .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader); + } else { + sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader) + .append("(").append(onDiskSizeWithoutHeader) + .append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")"); + } + sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader()) + .append(" totalChecksumBytes()=").append(totalChecksumBytes()) + .append(" isUnpacked()=").append(isUnpacked()) + .append(" buf=[ ") + .append(buf) + .append(", array().length=").append(buf.array().length) + .append(", arrayOffset()=").append(buf.arrayOffset()) + .append(" ]") + .append(" dataBeginsWith=") + .append(Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(), + Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()))) + .append(" fileContext=").append(fileContext) + .append(" ]"); + return sb.toString(); } + /** + * Called after reading a block with provided onDiskSizeWithHeader. + */ private void validateOnDiskSizeWithoutHeader( int expectedOnDiskSizeWithoutHeader) throws IOException { if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) { @@ -457,32 +484,80 @@ public class HFileBlock implements Cacheable { } /** + * Retrieves the decompressed/decrypted view of this block. An encoded block remains in its + * encoded structure. Internal structures are shared between instances where applicable. + */ + HFileBlock unpack(HFileContext fileContext, FSReader reader) throws IOException { + if (!fileContext.isCompressedOrEncrypted()) { + // TODO: cannot use our own fileContext here because HFileBlock(ByteBuffer, boolean), + // which is used for block serialization to L2 cache, does not preserve encoding and + // encryption details. + return this; + } + + HFileBlock unpacked = new HFileBlock(this); + unpacked.allocateBuffer(); // allocates space for the decompressed block + + HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ? + reader.getBlockDecodingContext() : reader.getDefaultBlockDecodingContext(); + ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(), + unpacked.getUncompressedSizeWithoutHeader(), unpacked.getBufferWithoutHeader(), + this.getBufferReadOnlyWithHeader().array(), this.headerSize()); + + // Preserve the next block's header bytes in the new block if we have them. + if (unpacked.hasNextBlockHeader()) { + System.arraycopy(this.buf.array(), this.buf.arrayOffset() + this.onDiskDataSizeWithHeader, + unpacked.buf.array(), unpacked.buf.arrayOffset() + unpacked.headerSize() + + unpacked.uncompressedSizeWithoutHeader + unpacked.totalChecksumBytes(), + unpacked.headerSize()); + } + return unpacked; + } + + /** + * Return true when this buffer includes next block's header. + */ + private boolean hasNextBlockHeader() { + return nextBlockOnDiskSizeWithHeader > 0; + } + + /** * Always allocates a new buffer of the correct size. Copies header bytes * from the existing buffer. Does not change header fields. * Reserve room to keep checksum bytes too. - * - * @param extraBytes whether to reserve room in the buffer to read the next - * block's header */ - private void allocateBuffer(boolean extraBytes) { + private void allocateBuffer() { int cksumBytes = totalChecksumBytes(); - int capacityNeeded = headerSize() + uncompressedSizeWithoutHeader + - cksumBytes + - (extraBytes ? headerSize() : 0); + int headerSize = headerSize(); + int capacityNeeded = headerSize + uncompressedSizeWithoutHeader + + cksumBytes + (hasNextBlockHeader() ? headerSize : 0); ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded); // Copy header bytes. System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(), - newBuf.arrayOffset(), headerSize()); + newBuf.arrayOffset(), headerSize); buf = newBuf; - buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes); + // set limit to exclude next block's header + buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes); + } + + /** + * Return true when this block's buffer has been unpacked, false otherwise. Note this is a + * calculated heuristic, not tracked attribute of the block. + */ + public boolean isUnpacked() { + final int cksumBytes = totalChecksumBytes(); + final int headerSize = headerSize(); + final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader + cksumBytes; + final int bufCapacity = buf.capacity(); + return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity + headerSize; } - /** An additional sanity-check in case no compression is being used. */ + /** An additional sanity-check in case no compression or encryption is being used. */ public void assumeUncompressed() throws IOException { - if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + + if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader + totalChecksumBytes()) { throw new IOException("Using no compression but " + "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", " @@ -512,7 +587,7 @@ public class HFileBlock implements Cacheable { } /** - * @return a byte stream reading the data section of this block + * @return a byte stream reading the data + checksum of this block */ public DataInputStream getByteStream() { return new DataInputStream(new ByteArrayInputStream(buf.array(), @@ -588,7 +663,6 @@ public class HFileBlock implements Cacheable { return nextBlockOnDiskSizeWithHeader; } - /** * Unified version 2 {@link HFile} block writer. The intended usage pattern * is as follows: @@ -631,7 +705,7 @@ public class HFileBlock implements Cacheable { /** * Current block type. Set in {@link #startWriting(BlockType)}. Could be - * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA} + * changed in {@link #finishBlock()} from {@link BlockType#DATA} * to {@link BlockType#ENCODED_DATA}. */ private BlockType blockType; @@ -648,7 +722,7 @@ public class HFileBlock implements Cacheable { /** * Bytes to be written to the file system, including the header. Compressed - * if compression is turned on. It also includes the checksum data that + * if compression is turned on. It also includes the checksum data that * immediately follows the block data. (header + data + checksums) */ private byte[] onDiskBytesWithHeader; @@ -1008,6 +1082,19 @@ public class HFileBlock implements Cacheable { return ByteBuffer.wrap(uncompressedBytesWithHeader); } + /** + * Returns the header followed by the on-disk (compressed/encoded/encrypted) data. This is + * needed for storing packed blocks in the block cache. Expects calling semantics identical to + * {@link #getUncompressedBufferWithHeader()}. Returns only the header and data, + * Does not include checksum data. + * + * @return packed block bytes for caching on write + */ + ByteBuffer getOnDiskBufferWithHeader() { + expectState(State.BLOCK_READY); + return ByteBuffer.wrap(onDiskBytesWithHeader); + } + private void expectState(State expectedState) { if (state != expectedState) { throw new IllegalStateException("Expected state: " + expectedState + @@ -1038,7 +1125,7 @@ public class HFileBlock implements Cacheable { * version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a * 0 value in bytesPerChecksum. */ - public HFileBlock getBlockForCaching() { + public HFileBlock getBlockForCaching(CacheConfig cacheConf) { HFileContext newContext = new HFileContextBuilder() .withBlockSize(fileContext.getBlocksize()) .withBytesPerCheckSum(0) @@ -1051,7 +1138,10 @@ public class HFileBlock implements Cacheable { .withIncludesTags(fileContext.isIncludesTags()) .build(); return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(), - getUncompressedSizeWithoutHeader(), prevOffset, getUncompressedBufferWithHeader(), + getUncompressedSizeWithoutHeader(), prevOffset, + cacheConf.shouldCacheCompressed(blockType.getCategory()) ? + getOnDiskBufferWithHeader() : + getUncompressedBufferWithHeader(), DONT_FILL_HEADER, startOffset, onDiskBytesWithHeader.length + onDiskChecksum.length, newContext); } @@ -1109,7 +1199,7 @@ public class HFileBlock implements Cacheable { /** * Creates a block iterator over the given portion of the {@link HFile}. * The iterator returns blocks starting with offset such that offset <= - * startOffset < endOffset. + * startOffset < endOffset. Returned blocks are always unpacked. * * @param startOffset the offset of the block to start iteration with * @param endOffset the offset to end iteration at (exclusive) @@ -1119,6 +1209,12 @@ public class HFileBlock implements Cacheable { /** Closes the backing streams */ void closeStreams() throws IOException; + + /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this file. */ + HFileBlockDecodingContext getBlockDecodingContext(); + + /** Get the default decoder for blocks from this file. */ + HFileBlockDecodingContext getDefaultBlockDecodingContext(); } /** @@ -1159,6 +1255,7 @@ public class HFileBlock implements Cacheable { @Override public BlockIterator blockRange(final long startOffset, final long endOffset) { + final FSReader owner = this; // handle for inner class return new BlockIterator() { private long offset = startOffset; @@ -1168,7 +1265,7 @@ public class HFileBlock implements Cacheable { return null; HFileBlock b = readBlockData(offset, -1, -1, false); offset += b.getOnDiskSizeWithHeader(); - return b; + return b.unpack(fileContext, owner); } @Override @@ -1274,7 +1371,8 @@ public class HFileBlock implements Cacheable { private HFileBlockDecodingContext encodedBlockDecodingCtx; - private HFileBlockDefaultDecodingContext defaultDecodingCtx; + /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}. */ + private final HFileBlockDefaultDecodingContext defaultDecodingCtx; private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread = new ThreadLocal<PrefetchedHeader>() { @@ -1290,10 +1388,8 @@ public class HFileBlock implements Cacheable { this.streamWrapper = stream; // Older versions of HBase didn't support checksum. this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum()); - defaultDecodingCtx = - new HFileBlockDefaultDecodingContext(fileContext); - encodedBlockDecodingCtx = - new HFileBlockDefaultDecodingContext(fileContext); + defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext); + encodedBlockDecodingCtx = defaultDecodingCtx; } /** @@ -1434,9 +1530,8 @@ public class HFileBlock implements Cacheable { HFileBlock b = null; if (onDiskSizeWithHeader > 0) { - // We know the total on-disk size but not the uncompressed size. Read - // the entire block into memory, then parse the header and decompress - // from memory if using compression. This code path is used when + // We know the total on-disk size. Read the entire block into memory, + // then parse the header. This code path is used when // doing a random read operation relying on the block index, as well as // when the client knows the on-disk size from peeking into the next // block's header (e.g. this block's header) when reading the previous @@ -1444,7 +1539,8 @@ public class HFileBlock implements Cacheable { // Size that we have to skip in case we have already read the header. int preReadHeaderSize = headerBuf == null ? 0 : hdrSize; - onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; + onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for this block plus the + // next block's header nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize, true, offset + preReadHeaderSize, pread); @@ -1457,11 +1553,10 @@ public class HFileBlock implements Cacheable { headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize); } // We know the total on-disk size but not the uncompressed size. Read - // the entire block into memory, then parse the header and decompress - // from memory if using compression. Here we have already read the - // block's header + // the entire block into memory, then parse the header. Here we have + // already read the block's header try { - b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); + b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); } catch (IOException ex) { // Seen in load testing. Provide comprehensive debug info. throw new IOException("Failed to read compressed block at " @@ -1499,66 +1594,34 @@ public class HFileBlock implements Cacheable { readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(), hdrSize, false, offset, pread); } - b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum()); + b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum()); onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize]; - System.arraycopy(headerBuf.array(), - headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); + System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize); nextBlockOnDiskSize = readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader() - hdrSize, true, offset + hdrSize, pread); onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize; } - Algorithm compressAlgo = fileContext.getCompression(); - boolean isCompressed = - compressAlgo != null - && compressAlgo != Compression.Algorithm.NONE; - - Encryption.Context cryptoContext = fileContext.getEncryptionContext(); - boolean isEncrypted = cryptoContext != null - && cryptoContext != Encryption.Context.NONE; - - if (!isCompressed && !isEncrypted) { + if (!fileContext.isCompressedOrEncrypted()) { b.assumeUncompressed(); } - if (verifyChecksum && - !validateBlockChecksum(b, onDiskBlock, hdrSize)) { + if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) { return null; // checksum mismatch } - if (isCompressed || isEncrypted) { - // This will allocate a new buffer but keep header bytes. - b.allocateBuffer(nextBlockOnDiskSize > 0); - if (b.blockType == BlockType.ENCODED_DATA) { - encodedBlockDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), - b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, - hdrSize); - } else { - defaultDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(), - b.getUncompressedSizeWithoutHeader(), b.getBufferWithoutHeader(), onDiskBlock, - hdrSize); - } - if (nextBlockOnDiskSize > 0) { - // Copy next block's header bytes into the new block if we have them. - System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(), - b.buf.arrayOffset() + hdrSize - + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(), - hdrSize); - } - } else { - // The onDiskBlock will become the headerAndDataBuffer for this block. - // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already - // contains the header of next block, so no need to set next - // block's header in it. - b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, - onDiskSizeWithHeader), this.fileContext.isUseHBaseChecksum()); - } + // The onDiskBlock will become the headerAndDataBuffer for this block. + // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already + // contains the header of next block, so no need to set next + // block's header in it. + b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader), + this.fileContext.isUseHBaseChecksum()); b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize; // Set prefetched header - if (b.nextBlockOnDiskSizeWithHeader > 0) { + if (b.hasNextBlockHeader()) { prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader(); System.arraycopy(onDiskBlock, onDiskSizeWithHeader, prefetchedHeader.header, 0, hdrSize); @@ -1578,37 +1641,53 @@ public class HFileBlock implements Cacheable { encodedBlockDecodingCtx = encoder.newDataBlockDecodingContext(this.fileContext); } + @Override + public HFileBlockDecodingContext getBlockDecodingContext() { + return this.encodedBlockDecodingCtx; + } + + @Override + public HFileBlockDecodingContext getDefaultBlockDecodingContext() { + return this.defaultDecodingCtx; + } + /** * Generates the checksum for the header as well as the data and * then validates that it matches the value stored in the header. * If there is a checksum mismatch, then return false. Otherwise * return true. */ - protected boolean validateBlockChecksum(HFileBlock block, - byte[] data, int hdrSize) throws IOException { - return ChecksumUtil.validateBlockChecksum(path, block, - data, hdrSize); + protected boolean validateBlockChecksum(HFileBlock block, byte[] data, int hdrSize) + throws IOException { + return ChecksumUtil.validateBlockChecksum(path, block, data, hdrSize); } @Override public void closeStreams() throws IOException { streamWrapper.close(); } + + @Override + public String toString() { + return "FSReaderV2 [ hfs=" + hfs + " path=" + path + " fileContext=" + fileContext + " ]"; + } } @Override public int getSerializedLength() { if (buf != null) { - return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE; + // include extra bytes for the next header when it's available. + int extraSpace = hasNextBlockHeader() ? headerSize() : 0; + return this.buf.limit() + extraSpace + HFileBlock.EXTRA_SERIALIZATION_SPACE; } return 0; } @Override public void serialize(ByteBuffer destination) { - ByteBuffer dupBuf = this.buf.duplicate(); - dupBuf.rewind(); - destination.put(dupBuf); + // assumes HeapByteBuffer + destination.put(this.buf.array(), this.buf.arrayOffset(), + getSerializedLength() - EXTRA_SERIALIZATION_SPACE); serializeExtraInfo(destination); } @@ -1656,13 +1735,9 @@ public class HFileBlock implements Cacheable { if (castedComparison.uncompressedSizeWithoutHeader != this.uncompressedSizeWithoutHeader) { return false; } - if (this.buf.compareTo(castedComparison.buf) != 0) { - return false; - } - if (this.buf.position() != castedComparison.buf.position()){ - return false; - } - if (this.buf.limit() != castedComparison.buf.limit()){ + if (Bytes.compareTo(this.buf.array(), this.buf.arrayOffset(), this.buf.limit(), + castedComparison.buf.array(), castedComparison.buf.arrayOffset(), + castedComparison.buf.limit()) != 0) { return false; } return true; @@ -1683,6 +1758,7 @@ public class HFileBlock implements Cacheable { return this.fileContext.getBytesPerChecksum(); } + /** @return the size of data on disk + header. Excludes checksum. */ int getOnDiskDataSizeWithHeader() { return this.onDiskDataSizeWithHeader; } @@ -1736,6 +1812,10 @@ public class HFileBlock implements Cacheable { return DUMMY_HEADER_NO_CHECKSUM; } + /** + * @return the HFileContext used to create this HFileBlock. Not necessary the + * fileContext for the file from which this block's data was originally read. + */ public HFileContext getHFileContext() { return this.fileContext; } @@ -1748,7 +1828,7 @@ public class HFileBlock implements Cacheable { static String toStringHeader(ByteBuffer buf) throws IOException { int offset = buf.arrayOffset(); byte[] b = buf.array(); - long magic = Bytes.toLong(b, offset); + long magic = Bytes.toLong(b, offset); BlockType bt = BlockType.read(buf); offset += Bytes.SIZEOF_LONG; int compressedBlockSizeNoHeader = Bytes.toInt(b, offset); @@ -1775,4 +1855,3 @@ public class HFileBlock implements Cacheable { " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; } } - http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index f7b5b9d..1073e93 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -772,7 +772,7 @@ public class HFileBlockIndex { * {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The * initial value accounts for the root level, and will be increased to two * as soon as we find out there is a leaf-level in - * {@link #blockWritten(long, int)}. + * {@link #blockWritten(long, int, int)}. */ private int numLevels = 1; @@ -798,8 +798,8 @@ public class HFileBlockIndex { /** Whether we require this block index to always be single-level. */ private boolean singleLevelOnly; - /** Block cache, or null if cache-on-write is disabled */ - private BlockCache blockCache; + /** CacheConfig, or null if cache-on-write is disabled */ + private CacheConfig cacheConf; /** Name to use for computing cache keys */ private String nameForCaching; @@ -814,18 +814,17 @@ public class HFileBlockIndex { * Creates a multi-level block index writer. * * @param blockWriter the block writer to use to write index blocks - * @param blockCache if this is not null, index blocks will be cached - * on write into this block cache. + * @param cacheConf used to determine when and how a block should be cached-on-write. */ public BlockIndexWriter(HFileBlock.Writer blockWriter, - BlockCache blockCache, String nameForCaching) { - if ((blockCache == null) != (nameForCaching == null)) { + CacheConfig cacheConf, String nameForCaching) { + if ((cacheConf == null) != (nameForCaching == null)) { throw new IllegalArgumentException("Block cache and file name for " + "caching must be both specified or both null"); } this.blockWriter = blockWriter; - this.blockCache = blockCache; + this.cacheConf = cacheConf; this.nameForCaching = nameForCaching; this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE; } @@ -979,9 +978,9 @@ public class HFileBlockIndex { byte[] curFirstKey = curChunk.getBlockKey(0); blockWriter.writeHeaderAndData(out); - if (blockCache != null) { - HFileBlock blockForCaching = blockWriter.getBlockForCaching(); - blockCache.cacheBlock(new BlockCacheKey(nameForCaching, + if (cacheConf != null) { + HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf); + cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching, beginOffset), blockForCaching); } @@ -1090,8 +1089,7 @@ public class HFileBlockIndex { * entry referring to that block to the parent-level index. */ @Override - public void blockWritten(long offset, int onDiskSize, int uncompressedSize) - { + public void blockWritten(long offset, int onDiskSize, int uncompressedSize) { // Add leaf index block size totalBlockOnDiskSize += onDiskSize; totalBlockUncompressedSize += uncompressedSize; @@ -1156,7 +1154,7 @@ public class HFileBlockIndex { */ @Override public boolean getCacheOnWrite() { - return blockCache != null; + return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java index 1292319..6f67df3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java @@ -249,6 +249,10 @@ public class HFileReaderV2 extends AbstractHFileReader { return new ScannerV2(this, cacheBlocks, pread, isCompaction); } + /** + * Retrieve block from cache. Validates the retrieved block's type vs {@code expectedBlockType} + * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block as necessary. + */ private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, boolean useLock, boolean isCompaction, boolean updateCacheMetrics, BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding) throws IOException { @@ -258,6 +262,9 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, updateCacheMetrics); if (cachedBlock != null) { + if (cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) { + cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader); + } validateBlockType(cachedBlock, expectedBlockType); if (expectedDataBlockEncoding == null) { @@ -337,6 +344,7 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, false, true, true, BlockType.META, null); if (cachedBlock != null) { + assert cachedBlock.isUnpacked() : "Packed block leak."; // Return a distinct 'shallow copy' of the block, // so pos does not get messed by the scanner return cachedBlock.getBufferWithoutHeader(); @@ -345,7 +353,7 @@ public class HFileReaderV2 extends AbstractHFileReader { } HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, - blockSize, -1, true); + blockSize, -1, true).unpack(hfileContext, fsBlockReader); // Cache the block if (cacheBlock) { @@ -359,7 +367,7 @@ public class HFileReaderV2 extends AbstractHFileReader { /** * Read in a file block of the given {@link BlockType} and - * {@link DataBlockEncoding}. + * {@link DataBlockEncoding}. Unpacks the block as necessary. * @param dataBlockOffset offset to read. * @param onDiskBlockSize size of the block * @param cacheBlock @@ -400,8 +408,7 @@ public class HFileReaderV2 extends AbstractHFileReader { // the other choice is to duplicate work (which the cache would prevent you // from doing). - BlockCacheKey cacheKey = - new BlockCacheKey(name, dataBlockOffset); + BlockCacheKey cacheKey = new BlockCacheKey(name, dataBlockOffset); boolean useLock = false; IdLock.Entry lockEntry = null; @@ -419,7 +426,7 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock, useLock, isCompaction, updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding); if (cachedBlock != null) { - validateBlockType(cachedBlock, expectedBlockType); + assert cachedBlock.isUnpacked() : "Packed block leak."; if (cachedBlock.getBlockType().isData()) { if (updateCacheMetrics) { HFile.dataBlockReadCnt.incrementAndGet(); @@ -448,18 +455,21 @@ public class HFileReaderV2 extends AbstractHFileReader { HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, pread); validateBlockType(hfileBlock, expectedBlockType); + HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); + BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); // Cache the block if necessary - if (cacheBlock && cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) { - cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), - this.cacheConf.isCacheDataInL1()); + if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) { + cacheConf.getBlockCache().cacheBlock(cacheKey, + cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked, + cacheConf.isInMemory(), this.cacheConf.isCacheDataInL1()); } if (updateCacheMetrics && hfileBlock.getBlockType().isData()) { HFile.dataBlockReadCnt.incrementAndGet(); } - return hfileBlock; + return unpacked; } } finally { traceScope.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index e6201bf..1d71f3e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -118,7 +118,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, - cacheIndexesOnWrite ? cacheConf.getBlockCache(): null, + cacheIndexesOnWrite ? cacheConf : null, cacheIndexesOnWrite ? name : null); dataBlockIndexWriter.setMaxChunkSize( HFileBlockIndex.getMaxChunkSize(conf)); @@ -143,7 +143,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { newBlock(); } - /** Clean up the current block */ + /** Clean up the current data block */ private void finishBlock() throws IOException { if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0) return; @@ -191,7 +191,7 @@ public class HFileWriterV2 extends AbstractHFileWriter { * the cache key. */ private void doCacheOnWrite(long offset) { - HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(); + HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf); cacheConf.getBlockCache().cacheBlock( new BlockCacheKey(name, offset), cacheFormatBlock); } http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 1fc2ecb..deeac13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -340,12 +341,35 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); long newSize = updateSizeMetrics(cb, false); map.put(cacheKey, cb); - elements.incrementAndGet(); + long val = elements.incrementAndGet(); + if (LOG.isTraceEnabled()) { + long size = map.size(); + assertCounterSanity(size, val); + } if (newSize > acceptableSize() && !evictionInProgress) { runEviction(); } } + /** + * Sanity-checking for parity between actual block cache content and metrics. + * Intended only for use with TRACE level logging and -ea JVM. + */ + private static void assertCounterSanity(long mapSize, long counterVal) { + if (counterVal < 0) { + LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" + counterVal + + ", mapSize=" + mapSize); + return; + } + if (mapSize < Integer.MAX_VALUE) { + double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize)) - 1.); + if (pct_diff > 0.05) { + LOG.trace("delta between reported and actual size > 5%. counterVal=" + counterVal + + ", mapSize=" + mapSize); + } + } + } + private int compare(Cacheable left, Cacheable right) { ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength()); left.serialize(l); @@ -459,7 +483,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { protected long evictBlock(LruCachedBlock block, boolean evictedByEvictionProcess) { map.remove(block.getCacheKey()); updateSizeMetrics(block, true); - elements.decrementAndGet(); + long val = elements.decrementAndGet(); + if (LOG.isTraceEnabled()) { + long size = map.size(); + assertCounterSanity(size, val); + } stats.evicted(block.getCachedTime()); if (evictedByEvictionProcess && victimHandler != null) { boolean wait = getCurrentSize() < acceptableSize(); @@ -503,9 +531,12 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { if(bytesToFree <= 0) return; // Instantiate priority buckets - BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize()); - BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize()); - BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize()); + BlockBucket bucketSingle = new BlockBucket("single", bytesToFree, blockSize, + singleSize()); + BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree, blockSize, + multiSize()); + BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree, blockSize, + memorySize()); // Scan entire map putting into appropriate buckets for(LruCachedBlock cachedBlock : map.values()) { @@ -534,7 +565,15 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { // so the single and multi buckets will be emptied bytesFreed = bucketSingle.free(s); bytesFreed += bucketMulti.free(m); + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + + " from single and multi buckets"); + } bytesFreed += bucketMemory.free(bytesToFree - bytesFreed); + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) + + " total from all three buckets "); + } } else { // this means no need to evict block in memory bucket, // and we try best to make the ratio between single-bucket and @@ -596,6 +635,23 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } } + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("blockCount", getBlockCount()) + .add("currentSize", getCurrentSize()) + .add("freeSize", getFreeSize()) + .add("maxSize", getMaxSize()) + .add("heapSize", heapSize()) + .add("minSize", minSize()) + .add("minFactor", minFactor) + .add("multiSize", multiSize()) + .add("multiFactor", multiFactor) + .add("singleSize", singleSize()) + .add("singleFactor", singleFactor) + .toString(); + } + /** * Used to group blocks into priority buckets. There will be a BlockBucket * for each priority (single, multi, memory). Once bucketed, the eviction @@ -603,11 +659,14 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * to configuration parameters and their relatives sizes. */ private class BlockBucket implements Comparable<BlockBucket> { + + private final String name; private LruCachedBlockQueue queue; private long totalSize = 0; private long bucketSize; - public BlockBucket(long bytesToFree, long blockSize, long bucketSize) { + public BlockBucket(String name, long bytesToFree, long blockSize, long bucketSize) { + this.name = name; this.bucketSize = bucketSize; queue = new LruCachedBlockQueue(bytesToFree, blockSize); totalSize = 0; @@ -619,6 +678,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } public long free(long toFree) { + if (LOG.isTraceEnabled()) { + LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this); + } LruCachedBlock cb; long freedBytes = 0; while ((cb = queue.pollLast()) != null) { @@ -627,6 +689,9 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return freedBytes; } } + if (LOG.isTraceEnabled()) { + LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " + this); + } return freedBytes; } @@ -653,8 +718,16 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { @Override public int hashCode() { - // Nothing distingushing about each instance unless I pass in a 'name' or something - return super.hashCode(); + return Objects.hashCode(name, bucketSize, queue, totalSize); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("name", name) + .add("totalSize", StringUtils.byteDesc(totalSize)) + .add("bucketSize", StringUtils.byteDesc(bucketSize)) + .toString(); } } @@ -769,6 +842,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { LruBlockCache.LOG.info("totalSize=" + StringUtils.byteDesc(totalSize) + ", " + "freeSize=" + StringUtils.byteDesc(freeSize) + ", " + "max=" + StringUtils.byteDesc(this.maxSize) + ", " + + "blockCount=" + getBlockCount() + ", " + "accesses=" + stats.getRequestCount() + ", " + "hits=" + stats.getHitCount() + ", " + "hitRatio=" + (stats.getHitCount() == 0 ? @@ -940,6 +1014,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } /** Clears the cache. Used in tests. */ + @VisibleForTesting public void clearCache() { this.map.clear(); this.elements.set(0); @@ -949,6 +1024,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { * Used in testing. May be very inefficient. * @return the set of cached file names */ + @VisibleForTesting SortedSet<String> getCachedFileNamesForTest() { SortedSet<String> fileNames = new TreeSet<String>(); for (BlockCacheKey cacheKey : map.keySet()) { @@ -969,6 +1045,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { return counts; } + @VisibleForTesting public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() { Map<DataBlockEncoding, Integer> counts = new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class); @@ -986,6 +1063,11 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { victimHandler = handler; } + @VisibleForTesting + Map<BlockCacheKey, LruCachedBlock> getMapForTests() { + return map; + } + BucketCache getVictimHandler() { return this.victimHandler; } @@ -994,4 +1076,4 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { public BlockCache[] getBlockCaches() { return null; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java index 343caad..2aacc67 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java @@ -182,10 +182,9 @@ public class TestCacheConfig { if (sizing) { long originalSize = bc.getCurrentSize(); bc.cacheBlock(bck, c, cc.isInMemory(), cc.isCacheDataInL1()); - long size = bc.getCurrentSize(); assertTrue(bc.getCurrentSize() > originalSize); bc.evictBlock(bck); - size = bc.getCurrentSize(); + long size = bc.getCurrentSize(); assertEquals(originalSize, size); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index c7050c8..d8c8669 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -84,6 +85,7 @@ public class TestCacheOnWrite { private final Compression.Algorithm compress; private final BlockEncoderTestType encoderType; private final HFileDataBlockEncoder encoder; + private final boolean cacheCompressedData; private static final int DATA_BLOCK_SIZE = 2048; private static final int NUM_KV = 25000; @@ -154,14 +156,15 @@ public class TestCacheOnWrite { } } - public TestCacheOnWrite(CacheOnWriteType cowType, - Compression.Algorithm compress, BlockEncoderTestType encoderType) { + public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm compress, + BlockEncoderTestType encoderType, boolean cacheCompressedData) { this.cowType = cowType; this.compress = compress; this.encoderType = encoderType; this.encoder = encoderType.getEncoder(); - testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + - ", encoderType=" + encoderType + "]"; + this.cacheCompressedData = cacheCompressedData; + testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress + + ", encoderType=" + encoderType + ", cacheCompressedData=" + cacheCompressedData + "]"; System.out.println(testDescription); } @@ -173,7 +176,9 @@ public class TestCacheOnWrite { HBaseTestingUtility.COMPRESSION_ALGORITHMS) { for (BlockEncoderTestType encoderType : BlockEncoderTestType.values()) { - cowTypes.add(new Object[] { cowType, compress, encoderType }); + for (boolean cacheCompressedData : new boolean[] { false, true }) { + cowTypes.add(new Object[] { cowType, compress, encoderType, cacheCompressedData }); + } } } } @@ -189,11 +194,12 @@ public class TestCacheOnWrite { conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, BLOOM_BLOCK_SIZE); conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY, - cowType.shouldBeCached(BlockType.DATA)); + cowType.shouldBeCached(BlockType.DATA)); conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.LEAF_INDEX)); conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, cowType.shouldBeCached(BlockType.BLOOM_CHUNK)); + conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY, cacheCompressedData); cowType.modifyConf(conf); fs = HFileSystem.get(conf); cacheConf = new CacheConfig(conf); @@ -225,6 +231,10 @@ public class TestCacheOnWrite { reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath, cacheConf, conf); } LOG.info("HFile information: " + reader); + HFileContext meta = new HFileContextBuilder().withCompression(compress) + .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL) + .withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding()) + .withIncludesTags(useTags).build(); final boolean cacheBlocks = false; final boolean pread = false; HFileScanner scanner = reader.getScanner(cacheBlocks, pread); @@ -248,16 +258,36 @@ public class TestCacheOnWrite { false, true, null, encodingInCache); BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset); - boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true) != null; + HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey, true, false, true); + boolean isCached = fromCache != null; boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType()); - if (shouldBeCached != isCached) { - throw new AssertionError( - "shouldBeCached: " + shouldBeCached+ "\n" + - "isCached: " + isCached + "\n" + - "Test description: " + testDescription + "\n" + - "block: " + block + "\n" + - "encodingInCache: " + encodingInCache + "\n" + - "blockCacheKey: " + blockCacheKey); + assertTrue("shouldBeCached: " + shouldBeCached+ "\n" + + "isCached: " + isCached + "\n" + + "Test description: " + testDescription + "\n" + + "block: " + block + "\n" + + "encodingInCache: " + encodingInCache + "\n" + + "blockCacheKey: " + blockCacheKey, + shouldBeCached == isCached); + if (isCached) { + if (cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) { + if (compress != Compression.Algorithm.NONE) { + assertFalse(fromCache.isUnpacked()); + } + fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader()); + } else { + assertTrue(fromCache.isUnpacked()); + } + // block we cached at write-time and block read from file should be identical + assertEquals(block.getChecksumType(), fromCache.getChecksumType()); + assertEquals(block.getBlockType(), fromCache.getBlockType()); + if (block.getBlockType() == BlockType.ENCODED_DATA) { + assertEquals(block.getDataBlockEncodingId(), fromCache.getDataBlockEncodingId()); + assertEquals(block.getDataBlockEncoding(), fromCache.getDataBlockEncoding()); + } + assertEquals(block.getOnDiskSizeWithHeader(), fromCache.getOnDiskSizeWithHeader()); + assertEquals(block.getOnDiskSizeWithoutHeader(), fromCache.getOnDiskSizeWithoutHeader()); + assertEquals( + block.getUncompressedSizeWithoutHeader(), fromCache.getUncompressedSizeWithoutHeader()); } prevBlock = block; offset += block.getOnDiskSizeWithHeader(); http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index 20a450c..8b29ea1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -124,7 +124,7 @@ public class TestChecksum { assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); // read data back from the hfile, exclude header and checksum - ByteBuffer bb = b.getBufferWithoutHeader(); // read back data + ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read back data DataInputStream in = new DataInputStream( new ByteArrayInputStream( bb.array(), bb.arrayOffset(), bb.limit())); @@ -163,6 +163,7 @@ public class TestChecksum { b = hbr.readBlockData(0, -1, -1, pread); is.close(); b.sanityCheck(); + b = b.unpack(meta, hbr); assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes()); @@ -272,12 +273,7 @@ public class TestChecksum { // validate data for (int i = 0; i < 1234; i++) { int val = in.readInt(); - if (val != i) { - String msg = "testChecksumCorruption: data mismatch at index " + - i + " expected " + i + " found " + val; - LOG.warn(msg); - assertEquals(i, val); - } + assertEquals("testChecksumCorruption: data mismatch at index " + i, i, val); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java index 7ed3959..35b4c61 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java @@ -80,14 +80,15 @@ public class TestForceCacheImportantBlocks { @Parameters public static Collection<Object[]> parameters() { // HFile versions - return Arrays.asList(new Object[][] { - new Object[] { new Integer(2), false }, - new Object[] { new Integer(3), true } - }); + return Arrays.asList( + new Object[] { 2, true }, + new Object[] { 2, false }, + new Object[] { 3, true }, + new Object[] { 3, false } + ); } - public TestForceCacheImportantBlocks(int hfileVersion, - boolean cfCacheEnabled) { + public TestForceCacheImportantBlocks(int hfileVersion, boolean cfCacheEnabled) { this.hfileVersion = hfileVersion; this.cfCacheEnabled = cfCacheEnabled; TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, hfileVersion); @@ -110,9 +111,9 @@ public class TestForceCacheImportantBlocks { hcd.setBlocksize(BLOCK_SIZE); hcd.setBlockCacheEnabled(cfCacheEnabled); HRegion region = TEST_UTIL.createTestRegion(TABLE, hcd); + BlockCache cache = region.getStore(hcd.getName()).getCacheConfig().getBlockCache(); + CacheStats stats = cache.getStats(); writeTestData(region); - CacheStats stats = - region.getStores().get(hcd.getName()).getCacheConfig().getBlockCache().getStats(); assertEquals(0, stats.getHitCount()); assertEquals(0, HFile.dataBlockReadCnt.get()); // Do a single get, take count of caches. If we are NOT caching DATA blocks, the miss @@ -141,4 +142,4 @@ public class TestForceCacheImportantBlocks { } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/eec15bd1/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index ef9a74f..11ac986 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -316,7 +316,8 @@ public class TestHFile extends HBaseTestCase { ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false); ByteBuffer expected = ByteBuffer.wrap(("something to test" + i).getBytes()); - assertTrue("failed to match metadata", actual.compareTo(expected) == 0); + assertEquals("failed to match metadata", + Bytes.toStringBinary(expected), Bytes.toStringBinary(actual)); } }
