HBASE-11331 [blockcache] lazy block decompression
When hbase.block.data.cachecompressed=true, DATA (and ENCODED_DATA) blocks are
cached in the BlockCache in their on-disk format. This is different from the
default behavior, which decompresses and decrypts a block before caching.
Conflicts:
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockCompatibility.java
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b8851309
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b8851309
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b8851309
Branch: refs/heads/0.98
Commit: b8851309e0ff1e04a3a0abd5fbad6d1868a151bc
Parents: 1cf6c7b
Author: Nick Dimiduk <[email protected]>
Authored: Tue Sep 9 15:53:47 2014 -0700
Committer: Nick Dimiduk <[email protected]>
Committed: Wed Sep 10 15:50:45 2014 -0700
----------------------------------------------------------------------
.../hadoop/hbase/io/hfile/HFileContext.java | 17 +
.../tmpl/regionserver/BlockCacheTmpl.jamon | 6 +-
.../hadoop/hbase/io/hfile/CacheConfig.java | 81 ++--
.../hadoop/hbase/io/hfile/HFileBlock.java | 441 +++++++++++--------
.../hadoop/hbase/io/hfile/HFileBlockIndex.java | 30 +-
.../hadoop/hbase/io/hfile/HFileReaderV2.java | 18 +-
.../hadoop/hbase/io/hfile/HFileWriterV2.java | 6 +-
.../hadoop/hbase/io/hfile/LruBlockCache.java | 102 ++++-
.../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 62 ++-
.../hadoop/hbase/io/hfile/TestChecksum.java | 10 +-
.../io/hfile/TestForceCacheImportantBlocks.java | 16 +-
.../apache/hadoop/hbase/io/hfile/TestHFile.java | 3 +-
.../hadoop/hbase/io/hfile/TestHFileBlock.java | 95 ++--
.../io/hfile/TestHFileBlockCompatibility.java | 16 +-
.../hbase/io/hfile/TestHFileEncryption.java | 10 +-
.../hbase/io/hfile/TestHFileWriterV2.java | 12 +-
.../hbase/io/hfile/TestHFileWriterV3.java | 13 +-
.../hfile/TestLazyDataBlockDecompression.java | 231 ++++++++++
18 files changed, 836 insertions(+), 333 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
----------------------------------------------------------------------
diff --git
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
index 3299e41..8cfc8a7 100644
---
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
+++
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java
@@ -96,6 +96,23 @@ public class HFileContext implements HeapSize, Cloneable {
this.cryptoContext = cryptoContext;
}
+ /**
+ * @return true when on-disk blocks from this file are compressed, and/or
encrypted;
+ * false otherwise.
+ */
+ public boolean isCompressedOrEncrypted() {
+ Compression.Algorithm compressAlgo = getCompression();
+ boolean compressed =
+ compressAlgo != null
+ && compressAlgo != Compression.Algorithm.NONE;
+
+ Encryption.Context cryptoContext = getEncryptionContext();
+ boolean encrypted = cryptoContext != null
+ && cryptoContext != Encryption.Context.NONE;
+
+ return compressed || encrypted;
+ }
+
public Compression.Algorithm getCompression() {
return compressAlgo;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon
b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon
index d0bf9ea..be0b5e0 100644
---
a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon
+++
b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon
@@ -157,9 +157,9 @@ org.apache.hadoop.util.StringUtils;
reader is closed</td>
</tr>
<tr>
- <td>Compress blocks</td>
- <td><% cacheConfig.shouldCacheCompressed() %></td>
- <td>True if blocks are compressed in cache</td>
+ <td>Cache DATA in compressed format</td>
+ <td><% cacheConfig.shouldCacheDataCompressed() %></td>
+ <td>True if DATA blocks are cached in their compressed form</td>
</tr>
<tr>
<td>Prefetch on Open</td>
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 9153c8a..fea5243 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -29,7 +30,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.DirectMemoryUtils;
import org.apache.hadoop.util.StringUtils;
@@ -61,11 +61,10 @@ public class CacheConfig {
"hfile.block.bloom.cacheonwrite";
/**
- * TODO: Implement this (jgray)
- * Configuration key to cache data blocks in compressed format.
+ * Configuration key to cache data blocks in compressed and/or encrypted
format.
*/
public static final String CACHE_DATA_BLOCKS_COMPRESSED_KEY =
- "hbase.rs.blockcache.cachedatacompressed";
+ "hbase.block.data.cachecompressed";
/**
* Configuration key to evict all blocks of a given file from the block cache
@@ -109,6 +108,14 @@ public class CacheConfig {
public static final String PREFETCH_BLOCKS_ON_OPEN_KEY =
"hbase.rs.prefetchblocksonopen";
+ /**
+ * The target block size used by blockcache instances. Defaults to
+ * {@link HConstants#DEFAULT_BLOCKSIZE}.
+ * TODO: this config point is completely wrong, as it's used to determine the
+ * target block size of BlockCache instances. Rename.
+ */
+ public static final String BLOCKCACHE_BLOCKSIZE_KEY =
"hbase.offheapcache.minblocksize";
+
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@@ -117,7 +124,7 @@ public class CacheConfig {
public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false;
public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false;
public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
- public static final boolean DEFAULT_COMPRESSED_CACHE = false;
+ public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false;
public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
/** Local reference to the block cache, null if completely disabled */
@@ -144,8 +151,8 @@ public class CacheConfig {
/** Whether blocks of a file should be evicted when the file is closed */
private boolean evictOnClose;
- /** Whether data blocks should be stored in compressed form in the cache */
- private final boolean cacheCompressed;
+ /** Whether data blocks should be stored in compressed and/or encrypted form
in the cache */
+ private final boolean cacheDataCompressed;
/** Whether data blocks should be prefetched into the cache */
private final boolean prefetchOnOpen;
@@ -170,7 +177,7 @@ public class CacheConfig {
DEFAULT_CACHE_BLOOMS_ON_WRITE) ||
family.shouldCacheBloomsOnWrite(),
conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY,
DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(),
- conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY,
DEFAULT_COMPRESSED_CACHE),
+ conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY,
DEFAULT_CACHE_DATA_COMPRESSED),
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY,
DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen()
);
@@ -187,15 +194,12 @@ public class CacheConfig {
DEFAULT_IN_MEMORY, // This is a family-level setting so can't be set
// strictly from conf
conf.getBoolean(CACHE_BLOCKS_ON_WRITE_KEY,
DEFAULT_CACHE_DATA_ON_WRITE),
- conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
- DEFAULT_CACHE_INDEXES_ON_WRITE),
- conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
- DEFAULT_CACHE_BLOOMS_ON_WRITE),
+ conf.getBoolean(CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
DEFAULT_CACHE_INDEXES_ON_WRITE),
+ conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
DEFAULT_CACHE_BLOOMS_ON_WRITE),
conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE),
- conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY,
- DEFAULT_COMPRESSED_CACHE),
+ conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY,
DEFAULT_CACHE_DATA_COMPRESSED),
conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN)
- );
+ );
}
/**
@@ -208,14 +212,14 @@ public class CacheConfig {
* @param cacheIndexesOnWrite whether index blocks should be cached on write
* @param cacheBloomsOnWrite whether blooms should be cached on write
* @param evictOnClose whether blocks should be evicted when HFile is closed
- * @param cacheCompressed whether to store blocks as compressed in the cache
+ * @param cacheDataCompressed whether to store blocks as compressed in the
cache
* @param prefetchOnOpen whether to prefetch blocks upon open
*/
CacheConfig(final BlockCache blockCache,
final boolean cacheDataOnRead, final boolean inMemory,
final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite,
final boolean cacheBloomsOnWrite, final boolean evictOnClose,
- final boolean cacheCompressed, final boolean prefetchOnOpen) {
+ final boolean cacheDataCompressed, final boolean prefetchOnOpen) {
this.blockCache = blockCache;
this.cacheDataOnRead = cacheDataOnRead;
this.inMemory = inMemory;
@@ -223,7 +227,7 @@ public class CacheConfig {
this.cacheIndexesOnWrite = cacheIndexesOnWrite;
this.cacheBloomsOnWrite = cacheBloomsOnWrite;
this.evictOnClose = evictOnClose;
- this.cacheCompressed = cacheCompressed;
+ this.cacheDataCompressed = cacheDataCompressed;
this.prefetchOnOpen = prefetchOnOpen;
}
@@ -235,7 +239,7 @@ public class CacheConfig {
this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory,
cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite,
cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose,
- cacheConf.cacheCompressed, cacheConf.prefetchOnOpen);
+ cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen);
}
/**
@@ -267,14 +271,13 @@ public class CacheConfig {
* available.
*/
public boolean shouldCacheBlockOnRead(BlockCategory category) {
- boolean shouldCache = isBlockCacheEnabled()
+ return isBlockCacheEnabled()
&& (cacheDataOnRead ||
category == BlockCategory.INDEX ||
category == BlockCategory.BLOOM ||
(prefetchOnOpen &&
(category != BlockCategory.META &&
category != BlockCategory.UNKNOWN)));
- return shouldCache;
}
/**
@@ -335,10 +338,23 @@ public class CacheConfig {
}
/**
- * @return true if blocks should be compressed in the cache, false if not
+ * @return true if data blocks should be compressed in the cache, false if
not
*/
- public boolean shouldCacheCompressed() {
- return isBlockCacheEnabled() && this.cacheCompressed;
+ public boolean shouldCacheDataCompressed() {
+ return isBlockCacheEnabled() && this.cacheDataCompressed;
+ }
+
+ /**
+ * @return true if this {@link BlockCategory} should be compressed in
blockcache, false otherwise
+ */
+ public boolean shouldCacheCompressed(BlockCategory category) {
+ if (!isBlockCacheEnabled()) return false;
+ switch (category) {
+ case DATA:
+ return this.cacheDataCompressed;
+ default:
+ return false;
+ }
}
/**
@@ -359,7 +375,7 @@ public class CacheConfig {
"[cacheIndexesOnWrite=" + shouldCacheIndexesOnWrite() + "] " +
"[cacheBloomsOnWrite=" + shouldCacheBloomsOnWrite() + "] " +
"[cacheEvictOnClose=" + shouldEvictOnClose() + "] " +
- "[cacheCompressed=" + shouldCacheCompressed() + "]" +
+ "[cacheDataCompressed=" + shouldCacheDataCompressed() + "] " +
"[prefetchOnOpen=" + shouldPrefetchOnOpen() + "]";
}
@@ -369,7 +385,8 @@ public class CacheConfig {
* Static reference to the block cache, or null if no caching should be used
* at all.
*/
- private static BlockCache globalBlockCache;
+ @VisibleForTesting
+ static BlockCache GLOBAL_BLOCK_CACHE_INSTANCE;
/** Boolean whether we have disabled the block cache entirely. */
private static boolean blockCacheDisabled = false;
@@ -381,7 +398,7 @@ public class CacheConfig {
* @return The block cache or <code>null</code>.
*/
private static synchronized BlockCache instantiateBlockCache(Configuration
conf) {
- if (globalBlockCache != null) return globalBlockCache;
+ if (GLOBAL_BLOCK_CACHE_INSTANCE != null) return
GLOBAL_BLOCK_CACHE_INSTANCE;
if (blockCacheDisabled) return null;
float cachePercentage =
conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY,
@@ -398,7 +415,7 @@ public class CacheConfig {
// Calculate the amount of heap to give the heap.
MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
long lruCacheSize = (long) (mu.getMax() * cachePercentage);
- int blockSize = conf.getInt("hbase.offheapcache.minblocksize",
HConstants.DEFAULT_BLOCKSIZE);
+ int blockSize = conf.getInt(BLOCKCACHE_BLOCKSIZE_KEY,
HConstants.DEFAULT_BLOCKSIZE);
long offHeapCacheSize =
(long) (conf.getFloat("hbase.offheapcache.percentage", (float) 0) *
DirectMemoryUtils.getDirectMemorySize());
@@ -450,15 +467,15 @@ public class CacheConfig {
LruBlockCache lruCache = new LruBlockCache(lruCacheSize, blockSize,
true, conf);
lruCache.setVictimCache(bucketCache);
if (bucketCache != null && combinedWithLru) {
- globalBlockCache = new CombinedBlockCache(lruCache, bucketCache);
+ GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(lruCache,
bucketCache);
} else {
- globalBlockCache = lruCache;
+ GLOBAL_BLOCK_CACHE_INSTANCE = lruCache;
}
} else {
LOG.warn("SlabCache is deprecated. Consider BucketCache as a
replacement.");
- globalBlockCache = new DoubleBlockCache(
+ GLOBAL_BLOCK_CACHE_INSTANCE = new DoubleBlockCache(
lruCacheSize, offHeapCacheSize, blockSize, blockSize, conf);
}
- return globalBlockCache;
+ return GLOBAL_BLOCK_CACHE_INSTANCE;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index c43b036..cfe92f1 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -35,9 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
@@ -63,25 +60,26 @@ import com.google.common.base.Preconditions;
* information from the block index are required to read a block.
* <li>In version 2 a block is structured as follows:
* <ul>
+ * <li>header (see {@link Writer#finishBlock()})
+ * <ul>
* <li>Magic record identifying the block type (8 bytes)
- * <li>Compressed block size, header not included (4 bytes)
- * <li>Uncompressed block size, header not included (4 bytes)
+ * <li>Compressed block size, excluding header, including checksum (4 bytes)
+ * <li>Uncompressed block size, excluding header, excluding checksum (4 bytes)
* <li>The offset of the previous block of the same type (8 bytes). This is
* used to be able to navigate to the previous block without going to the block
- * <li>For minorVersions >=1, there is an additional 4 byte field
- * bytesPerChecksum that records the number of bytes in a checksum chunk.
- * <li>For minorVersions >=1, there is a 4 byte value to store the size of
- * data on disk (excluding the checksums)
+ * <li>For minorVersions >=1, the ordinal describing checksum type (1 byte)
+ * <li>For minorVersions >=1, the number of data bytes/checksum chunk (4 bytes)
+ * <li>For minorVersions >=1, the size of data on disk, including header,
+ * excluding checksums (4 bytes)
+ * </ul>
+ * </li>
+ * <li>Raw/Compressed/Encrypted/Encoded data. The compression algorithm is the
+ * same for all the blocks in the {@link HFile}, similarly to what was done in
+ * version 1.
* <li>For minorVersions >=1, a series of 4 byte checksums, one each for
* the number of bytes specified by bytesPerChecksum.
- * index.
- * <li>Compressed data (or uncompressed data if compression is disabled). The
- * compression algorithm is the same for all the blocks in the {@link HFile},
- * similarly to what was done in version 1.
* </ul>
* </ul>
- * The version 2 block representation in the block cache is the same as above,
- * except that the data section is always uncompressed in the cache.
*/
@InterfaceAudience.Private
public class HFileBlock implements Cacheable {
@@ -110,7 +108,7 @@ public class HFileBlock implements Cacheable {
ByteBuffer.wrap(new byte[0], 0, 0).getClass(), false);
// meta.usesHBaseChecksum+offset+nextBlockOnDiskSizeWithHeader
- public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE +
Bytes.SIZEOF_INT
+ public static final int EXTRA_SERIALIZATION_SPACE = Bytes.SIZEOF_BYTE +
Bytes.SIZEOF_INT
+ Bytes.SIZEOF_LONG;
/**
@@ -135,6 +133,9 @@ public class HFileBlock implements Cacheable {
HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum);
ourBuffer.offset = buf.getLong();
ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
+ if (ourBuffer.hasNextBlockHeader()) {
+ ourBuffer.buf.limit(ourBuffer.buf.limit() -
ourBuffer.headerSize());
+ }
return ourBuffer;
}
@@ -154,23 +155,28 @@ public class HFileBlock implements Cacheable {
.registerDeserializer(blockDeserializer);
}
+ /** Type of block. Header field 0. */
private BlockType blockType;
- /** Size on disk without the header. It includes checksum data too. */
+ /** Size on disk excluding header, including checksum. Header field 1. */
private int onDiskSizeWithoutHeader;
- /** Size of pure data. Does not include header or checksums */
+ /** Size of pure data. Does not include header or checksums. Header field 2.
*/
private final int uncompressedSizeWithoutHeader;
- /** The offset of the previous block on disk */
+ /** The offset of the previous block on disk. Header field 3. */
private final long prevBlockOffset;
- /** Size on disk of header and data. Does not include checksum data */
+ /**
+ * Size on disk of header + data. Excludes checksum. Header field 6,
+ * OR calculated from {@link #onDiskSizeWithoutHeader} when using HDFS
checksum.
+ */
private final int onDiskDataSizeWithHeader;
/** The in-memory representation of the hfile block */
private ByteBuffer buf;
- /** Meta data that holds meta information on the hfileblock**/
+
+ /** Meta data that holds meta information on the hfileblock */
private HFileContext fileContext;
/**
@@ -192,27 +198,18 @@ public class HFileBlock implements Cacheable {
* and is sitting in a byte buffer.
*
* @param blockType the type of this block, see {@link BlockType}
- * @param onDiskSizeWithoutHeader compressed size of the block if compression
- * is used, otherwise uncompressed size, header size not included
- * @param uncompressedSizeWithoutHeader uncompressed size of the block,
- * header size not included. Equals onDiskSizeWithoutHeader if
- * compression is disabled.
- * @param prevBlockOffset the offset of the previous block in the
- * {@link HFile}
+ * @param onDiskSizeWithoutHeader see {@link #onDiskSizeWithoutHeader}
+ * @param uncompressedSizeWithoutHeader see {@link
#uncompressedSizeWithoutHeader}
+ * @param prevBlockOffset see {@link #prevBlockOffset}
* @param buf block header ({@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes)
followed by
* uncompressed data. This
- * @param fillHeader true to fill in the first {@link
HConstants#HFILEBLOCK_HEADER_SIZE} bytes of
- * the buffer based on the header fields provided
+ * @param fillHeader when true, parse {@code buf} and override the first 4
header fields.
* @param offset the file offset the block was read from
- * @param bytesPerChecksum the number of bytes per checksum chunk
- * @param checksumType the checksum algorithm to use
- * @param onDiskDataSizeWithHeader size of header and data on disk not
- * including checksum data
+ * @param onDiskDataSizeWithHeader see {@link #onDiskDataSizeWithHeader}
* @param fileContext HFile meta data
*/
- HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader,
- int uncompressedSizeWithoutHeader, long prevBlockOffset, ByteBuffer buf,
- boolean fillHeader, long offset,
+ HFileBlock(BlockType blockType, int onDiskSizeWithoutHeader, int
uncompressedSizeWithoutHeader,
+ long prevBlockOffset, ByteBuffer buf, boolean fillHeader, long offset,
int onDiskDataSizeWithHeader, HFileContext fileContext) {
this.blockType = blockType;
this.onDiskSizeWithoutHeader = onDiskSizeWithoutHeader;
@@ -224,6 +221,22 @@ public class HFileBlock implements Cacheable {
this.offset = offset;
this.onDiskDataSizeWithHeader = onDiskDataSizeWithHeader;
this.fileContext = fileContext;
+ this.buf.rewind();
+ }
+
+ /**
+ * Copy constructor. Creates a shallow copy of {@code that}'s buffer.
+ */
+ HFileBlock(HFileBlock that) {
+ this.blockType = that.blockType;
+ this.onDiskSizeWithoutHeader = that.onDiskSizeWithoutHeader;
+ this.uncompressedSizeWithoutHeader = that.uncompressedSizeWithoutHeader;
+ this.prevBlockOffset = that.prevBlockOffset;
+ this.buf = that.buf.duplicate();
+ this.offset = that.offset;
+ this.onDiskDataSizeWithHeader = that.onDiskDataSizeWithHeader;
+ this.fileContext = that.fileContext;
+ this.nextBlockOnDiskSizeWithHeader = that.nextBlockOnDiskSizeWithHeader;
}
/**
@@ -271,28 +284,21 @@ public class HFileBlock implements Cacheable {
}
/**
- * @return the on-disk size of the block with header size included. This
- * includes the header, the data and the checksum data.
+ * @return the on-disk size of header + data part + checksum.
*/
public int getOnDiskSizeWithHeader() {
return onDiskSizeWithoutHeader + headerSize();
}
/**
- * Returns the size of the compressed part of the block in case compression
- * is used, or the uncompressed size of the data part otherwise. Header size
- * and checksum data size is not included.
- *
- * @return the on-disk size of the data part of the block, header and
- * checksum not included.
+ * @return the on-disk size of the data part + checksum (header excluded).
*/
public int getOnDiskSizeWithoutHeader() {
return onDiskSizeWithoutHeader;
}
/**
- * @return the uncompressed size of the data part of the block, header not
- * included
+ * @return the uncompressed size of data part (header and checksum excluded).
*/
public int getUncompressedSizeWithoutHeader() {
return uncompressedSizeWithoutHeader;
@@ -307,8 +313,8 @@ public class HFileBlock implements Cacheable {
}
/**
- * Writes header fields into the first {@link
HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the
- * buffer. Resets the buffer position to the end of header as side effect.
+ * Rewinds {@code buf} and writes first 4 header fields. {@code buf} position
+ * is modified as side-effect.
*/
private void overwriteHeader() {
buf.rewind();
@@ -319,11 +325,9 @@ public class HFileBlock implements Cacheable {
}
/**
- * Returns a buffer that does not include the header. The array offset points
- * to the start of the block data right after the header. The underlying data
- * array is not copied. Checksum data is not included in the returned buffer.
+ * Returns a buffer that does not include the header or checksum.
*
- * @return the buffer with header skipped
+ * @return the buffer with header skipped and checksum omitted.
*/
public ByteBuffer getBufferWithoutHeader() {
return ByteBuffer.wrap(buf.array(), buf.arrayOffset() + headerSize(),
@@ -335,7 +339,7 @@ public class HFileBlock implements Cacheable {
* modify the buffer object. This method has to be public because it is
* used in {@link CompoundBloomFilter} to avoid object creation on every
* Bloom filter lookup, but has to be used with caution. Checksum data
- * is not included in the returned buffer.
+ * is not included in the returned buffer but header data is.
*
* @return the buffer of this block for read-only operations
*/
@@ -349,17 +353,17 @@ public class HFileBlock implements Cacheable {
* not modify the buffer object. This method has to be public because it is
* used in {@link BucketCache} to avoid buffer copy.
*
- * @return the byte buffer with header included for read-only operations
+ * @return the buffer with header and checksum included for read-only
operations
*/
public ByteBuffer getBufferReadOnlyWithHeader() {
return ByteBuffer.wrap(buf.array(), buf.arrayOffset(),
buf.limit()).slice();
}
/**
- * Returns a byte buffer of this block, including header data, positioned at
+ * Returns a byte buffer of this block, including header data and checksum,
positioned at
* the beginning of header. The underlying data array is not copied.
*
- * @return the byte buffer with header included
+ * @return the byte buffer with header and checksum included
*/
ByteBuffer getBufferWithHeader() {
ByteBuffer dupBuf = buf.duplicate();
@@ -375,22 +379,25 @@ public class HFileBlock implements Cacheable {
}
}
+ private void sanityCheckAssertion(BlockType valueFromBuf, BlockType
valueFromField)
+ throws IOException {
+ if (valueFromBuf != valueFromField) {
+ throw new IOException("Block type stored in the buffer: " +
+ valueFromBuf + ", block type field: " + valueFromField);
+ }
+ }
+
/**
* Checks if the block is internally consistent, i.e. the first
- * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
valid header consistent
- * with the fields. This function is primary for testing and debugging, and
- * is not thread-safe, because it alters the internal buffer pointer.
+ * {@link HConstants#HFILEBLOCK_HEADER_SIZE} bytes of the buffer contain a
+ * valid header consistent with the fields. Assumes a packed block structure.
+ * This function is primary for testing and debugging, and is not
+ * thread-safe, because it alters the internal buffer pointer.
*/
void sanityCheck() throws IOException {
buf.rewind();
- {
- BlockType blockTypeFromBuf = BlockType.read(buf);
- if (blockTypeFromBuf != blockType) {
- throw new IOException("Block type stored in the buffer: " +
- blockTypeFromBuf + ", block type field: " + blockType);
- }
- }
+ sanityCheckAssertion(BlockType.read(buf), blockType);
sanityCheckAssertion(buf.getInt(), onDiskSizeWithoutHeader,
"onDiskSizeWithoutHeader");
@@ -402,45 +409,65 @@ public class HFileBlock implements Cacheable {
if (this.fileContext.isUseHBaseChecksum()) {
sanityCheckAssertion(buf.get(),
this.fileContext.getChecksumType().getCode(), "checksumType");
sanityCheckAssertion(buf.getInt(),
this.fileContext.getBytesPerChecksum(), "bytesPerChecksum");
- sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader,
- "onDiskDataSizeWithHeader");
+ sanityCheckAssertion(buf.getInt(), onDiskDataSizeWithHeader,
"onDiskDataSizeWithHeader");
}
int cksumBytes = totalChecksumBytes();
- int hdrSize = headerSize();
- int expectedBufLimit = uncompressedSizeWithoutHeader + headerSize() +
- cksumBytes;
+ int expectedBufLimit = onDiskDataSizeWithHeader + cksumBytes;
if (buf.limit() != expectedBufLimit) {
throw new AssertionError("Expected buffer limit " + expectedBufLimit
+ ", got " + buf.limit());
}
// We might optionally allocate HFILEBLOCK_HEADER_SIZE more bytes to read
the next
- // block's, header, so there are two sensible values for buffer capacity.
- int size = uncompressedSizeWithoutHeader + hdrSize + cksumBytes;
- if (buf.capacity() != size &&
- buf.capacity() != size + hdrSize) {
+ // block's header, so there are two sensible values for buffer capacity.
+ int hdrSize = headerSize();
+ if (buf.capacity() != expectedBufLimit &&
+ buf.capacity() != expectedBufLimit + hdrSize) {
throw new AssertionError("Invalid buffer capacity: " + buf.capacity() +
- ", expected " + size + " or " + (size + hdrSize));
+ ", expected " + expectedBufLimit + " or " + (expectedBufLimit +
hdrSize));
}
}
@Override
public String toString() {
- return "blockType="
- + blockType
- + ", onDiskSizeWithoutHeader="
- + onDiskSizeWithoutHeader
- + ", uncompressedSizeWithoutHeader="
- + uncompressedSizeWithoutHeader
- + ", prevBlockOffset="
- + prevBlockOffset
- + ", dataBeginsWith="
- + Bytes.toStringBinary(buf.array(), buf.arrayOffset() + headerSize(),
- Math.min(32, buf.limit() - buf.arrayOffset() - headerSize()))
- + ", fileOffset=" + offset;
+ StringBuilder sb = new StringBuilder()
+ .append("HFileBlock [")
+ .append(" fileOffset=").append(offset)
+ .append(" headerSize()=").append(headerSize())
+ .append(" blockType=").append(blockType)
+ .append(" onDiskSizeWithoutHeader=").append(onDiskSizeWithoutHeader)
+ .append("
uncompressedSizeWithoutHeader=").append(uncompressedSizeWithoutHeader)
+ .append(" prevBlockOffset=").append(prevBlockOffset)
+ .append("
isUseHBaseChecksum()=").append(fileContext.isUseHBaseChecksum());
+ if (fileContext.isUseHBaseChecksum()) {
+ sb.append("
checksumType=").append(ChecksumType.codeToType(this.buf.get(24)))
+ .append(" bytesPerChecksum=").append(this.buf.getInt(24 + 1))
+ .append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader);
+ } else {
+ sb.append(" onDiskDataSizeWithHeader=").append(onDiskDataSizeWithHeader)
+ .append("(").append(onDiskSizeWithoutHeader)
+
.append("+").append(HConstants.HFILEBLOCK_HEADER_SIZE_NO_CHECKSUM).append(")");
+ }
+ sb.append(" getOnDiskSizeWithHeader()=").append(getOnDiskSizeWithHeader())
+ .append(" totalChecksumBytes()=").append(totalChecksumBytes())
+ .append(" isUnpacked()=").append(isUnpacked())
+ .append(" buf=[ ")
+ .append(buf)
+ .append(", array().length=").append(buf.array().length)
+ .append(", arrayOffset()=").append(buf.arrayOffset())
+ .append(" ]")
+ .append(" dataBeginsWith=")
+ .append(Bytes.toStringBinary(buf.array(), buf.arrayOffset() +
headerSize(),
+ Math.min(32, buf.limit() - buf.arrayOffset() - headerSize())))
+ .append(" fileContext=").append(fileContext)
+ .append(" ]");
+ return sb.toString();
}
+ /**
+ * Called after reading a block with provided onDiskSizeWithHeader.
+ */
private void validateOnDiskSizeWithoutHeader(
int expectedOnDiskSizeWithoutHeader) throws IOException {
if (onDiskSizeWithoutHeader != expectedOnDiskSizeWithoutHeader) {
@@ -456,32 +483,80 @@ public class HFileBlock implements Cacheable {
}
/**
+ * Retrieves the decompressed/decrypted view of this block. An encoded block
remains in its
+ * encoded structure. Internal structures are shared between instances where
applicable.
+ */
+ HFileBlock unpack(HFileContext fileContext, FSReader reader) throws
IOException {
+ if (!fileContext.isCompressedOrEncrypted()) {
+ // TODO: cannot use our own fileContext here because
HFileBlock(ByteBuffer, boolean),
+ // which is used for block serialization to L2 cache, does not preserve
encoding and
+ // encryption details.
+ return this;
+ }
+
+ HFileBlock unpacked = new HFileBlock(this);
+ unpacked.allocateBuffer(); // allocates space for the decompressed block
+
+ HFileBlockDecodingContext ctx = blockType == BlockType.ENCODED_DATA ?
+ reader.getBlockDecodingContext() :
reader.getDefaultBlockDecodingContext();
+ ctx.prepareDecoding(unpacked.getOnDiskSizeWithoutHeader(),
+ unpacked.getUncompressedSizeWithoutHeader(),
unpacked.getBufferWithoutHeader(),
+ this.getBufferReadOnlyWithHeader().array(), this.headerSize());
+
+ // Preserve the next block's header bytes in the new block if we have them.
+ if (unpacked.hasNextBlockHeader()) {
+ System.arraycopy(this.buf.array(), this.buf.arrayOffset() +
this.onDiskDataSizeWithHeader,
+ unpacked.buf.array(), unpacked.buf.arrayOffset() +
unpacked.headerSize() +
+ unpacked.uncompressedSizeWithoutHeader +
unpacked.totalChecksumBytes(),
+ unpacked.headerSize());
+ }
+ return unpacked;
+ }
+
+ /**
+ * Return true when this buffer includes next block's header.
+ */
+ private boolean hasNextBlockHeader() {
+ return nextBlockOnDiskSizeWithHeader > 0;
+ }
+
+ /**
* Always allocates a new buffer of the correct size. Copies header bytes
* from the existing buffer. Does not change header fields.
* Reserve room to keep checksum bytes too.
- *
- * @param extraBytes whether to reserve room in the buffer to read the next
- * block's header
*/
- private void allocateBuffer(boolean extraBytes) {
+ private void allocateBuffer() {
int cksumBytes = totalChecksumBytes();
- int capacityNeeded = headerSize() + uncompressedSizeWithoutHeader +
- cksumBytes +
- (extraBytes ? headerSize() : 0);
+ int headerSize = headerSize();
+ int capacityNeeded = headerSize + uncompressedSizeWithoutHeader +
+ cksumBytes + (hasNextBlockHeader() ? headerSize : 0);
ByteBuffer newBuf = ByteBuffer.allocate(capacityNeeded);
// Copy header bytes.
System.arraycopy(buf.array(), buf.arrayOffset(), newBuf.array(),
- newBuf.arrayOffset(), headerSize());
+ newBuf.arrayOffset(), headerSize);
buf = newBuf;
- buf.limit(headerSize() + uncompressedSizeWithoutHeader + cksumBytes);
+ // set limit to exclude next block's header
+ buf.limit(headerSize + uncompressedSizeWithoutHeader + cksumBytes);
+ }
+
+ /**
+ * Return true when this block's buffer has been unpacked, false otherwise.
Note this is a
+ * calculated heuristic, not tracked attribute of the block.
+ */
+ public boolean isUnpacked() {
+ final int cksumBytes = totalChecksumBytes();
+ final int headerSize = headerSize();
+ final int expectedCapacity = headerSize + uncompressedSizeWithoutHeader +
cksumBytes;
+ final int bufCapacity = buf.capacity();
+ return bufCapacity == expectedCapacity || bufCapacity == expectedCapacity
+ headerSize;
}
- /** An additional sanity-check in case no compression is being used. */
+ /** An additional sanity-check in case no compression or encryption is being
used. */
public void assumeUncompressed() throws IOException {
- if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
+ if (onDiskSizeWithoutHeader != uncompressedSizeWithoutHeader +
totalChecksumBytes()) {
throw new IOException("Using no compression but "
+ "onDiskSizeWithoutHeader=" + onDiskSizeWithoutHeader + ", "
@@ -511,7 +586,7 @@ public class HFileBlock implements Cacheable {
}
/**
- * @return a byte stream reading the data section of this block
+ * @return a byte stream reading the data + checksum of this block
*/
public DataInputStream getByteStream() {
return new DataInputStream(new ByteArrayInputStream(buf.array(),
@@ -587,7 +662,6 @@ public class HFileBlock implements Cacheable {
return nextBlockOnDiskSizeWithHeader;
}
-
/**
* Unified version 2 {@link HFile} block writer. The intended usage pattern
* is as follows:
@@ -630,7 +704,7 @@ public class HFileBlock implements Cacheable {
/**
* Current block type. Set in {@link #startWriting(BlockType)}. Could be
- * changed in {@link #encodeDataBlockForDisk()} from {@link BlockType#DATA}
+ * changed in {@link #finishBlock()} from {@link BlockType#DATA}
* to {@link BlockType#ENCODED_DATA}.
*/
private BlockType blockType;
@@ -643,7 +717,7 @@ public class HFileBlock implements Cacheable {
/**
* Bytes to be written to the file system, including the header. Compressed
- * if compression is turned on. It also includes the checksum data that
+ * if compression is turned on. It also includes the checksum data that
* immediately follows the block data. (header + data + checksums)
*/
private byte[] onDiskBytesWithHeader;
@@ -990,6 +1064,19 @@ public class HFileBlock implements Cacheable {
return ByteBuffer.wrap(uncompressedBytesWithHeader);
}
+ /**
+ * Returns the header followed by the on-disk
(compressed/encoded/encrypted) data. This is
+ * needed for storing packed blocks in the block cache. Expects calling
semantics identical to
+ * {@link #getUncompressedBufferWithHeader()}. Returns only the header and
data,
+ * Does not include checksum data.
+ *
+ * @return packed block bytes for caching on write
+ */
+ ByteBuffer getOnDiskBufferWithHeader() {
+ expectState(State.BLOCK_READY);
+ return ByteBuffer.wrap(onDiskBytesWithHeader);
+ }
+
private void expectState(State expectedState) {
if (state != expectedState) {
throw new IllegalStateException("Expected state: " + expectedState +
@@ -1020,7 +1107,7 @@ public class HFileBlock implements Cacheable {
* version is MINOR_VERSION_WITH_CHECKSUM. This is indicated by setting a
* 0 value in bytesPerChecksum.
*/
- public HFileBlock getBlockForCaching() {
+ public HFileBlock getBlockForCaching(CacheConfig cacheConf) {
HFileContext newContext = new HFileContextBuilder()
.withBlockSize(fileContext.getBlocksize())
.withBytesPerCheckSum(0)
@@ -1033,7 +1120,10 @@ public class HFileBlock implements Cacheable {
.withIncludesTags(fileContext.isIncludesTags())
.build();
return new HFileBlock(blockType, getOnDiskSizeWithoutHeader(),
- getUncompressedSizeWithoutHeader(), prevOffset,
getUncompressedBufferWithHeader(),
+ getUncompressedSizeWithoutHeader(), prevOffset,
+ cacheConf.shouldCacheCompressed(blockType.getCategory()) ?
+ getOnDiskBufferWithHeader() :
+ getUncompressedBufferWithHeader(),
DONT_FILL_HEADER, startOffset,
onDiskBytesWithHeader.length + onDiskChecksum.length, newContext);
}
@@ -1091,7 +1181,7 @@ public class HFileBlock implements Cacheable {
/**
* Creates a block iterator over the given portion of the {@link HFile}.
* The iterator returns blocks starting with offset such that offset <=
- * startOffset < endOffset.
+ * startOffset < endOffset. Returned blocks are always unpacked.
*
* @param startOffset the offset of the block to start iteration with
* @param endOffset the offset to end iteration at (exclusive)
@@ -1101,6 +1191,12 @@ public class HFileBlock implements Cacheable {
/** Closes the backing streams */
void closeStreams() throws IOException;
+
+ /** Get a decoder for {@link BlockType#ENCODED_DATA} blocks from this
file. */
+ HFileBlockDecodingContext getBlockDecodingContext();
+
+ /** Get the default decoder for blocks from this file. */
+ HFileBlockDecodingContext getDefaultBlockDecodingContext();
}
/**
@@ -1141,6 +1237,7 @@ public class HFileBlock implements Cacheable {
@Override
public BlockIterator blockRange(final long startOffset,
final long endOffset) {
+ final FSReader owner = this; // handle for inner class
return new BlockIterator() {
private long offset = startOffset;
@@ -1150,7 +1247,7 @@ public class HFileBlock implements Cacheable {
return null;
HFileBlock b = readBlockData(offset, -1, -1, false);
offset += b.getOnDiskSizeWithHeader();
- return b;
+ return b.unpack(fileContext, owner);
}
@Override
@@ -1256,7 +1353,8 @@ public class HFileBlock implements Cacheable {
private HFileBlockDecodingContext encodedBlockDecodingCtx;
- private HFileBlockDefaultDecodingContext defaultDecodingCtx;
+ /** Default context used when BlockType != {@link BlockType#ENCODED_DATA}.
*/
+ private final HFileBlockDefaultDecodingContext defaultDecodingCtx;
private ThreadLocal<PrefetchedHeader> prefetchedHeaderForThread =
new ThreadLocal<PrefetchedHeader>() {
@@ -1272,10 +1370,8 @@ public class HFileBlock implements Cacheable {
this.streamWrapper = stream;
// Older versions of HBase didn't support checksum.
this.streamWrapper.prepareForBlockReader(!fileContext.isUseHBaseChecksum());
- defaultDecodingCtx =
- new HFileBlockDefaultDecodingContext(fileContext);
- encodedBlockDecodingCtx =
- new HFileBlockDefaultDecodingContext(fileContext);
+ defaultDecodingCtx = new HFileBlockDefaultDecodingContext(fileContext);
+ encodedBlockDecodingCtx = defaultDecodingCtx;
}
/**
@@ -1416,9 +1512,8 @@ public class HFileBlock implements Cacheable {
HFileBlock b = null;
if (onDiskSizeWithHeader > 0) {
- // We know the total on-disk size but not the uncompressed size. Read
- // the entire block into memory, then parse the header and decompress
- // from memory if using compression. This code path is used when
+ // We know the total on-disk size. Read the entire block into memory,
+ // then parse the header. This code path is used when
// doing a random read operation relying on the block index, as well as
// when the client knows the on-disk size from peeking into the next
// block's header (e.g. this block's header) when reading the previous
@@ -1426,7 +1521,8 @@ public class HFileBlock implements Cacheable {
// Size that we have to skip in case we have already read the header.
int preReadHeaderSize = headerBuf == null ? 0 : hdrSize;
- onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize];
+ onDiskBlock = new byte[onDiskSizeWithHeader + hdrSize]; // room for
this block plus the
+ // next
block's header
nextBlockOnDiskSize = readAtOffset(is, onDiskBlock,
preReadHeaderSize, onDiskSizeWithHeader - preReadHeaderSize,
true, offset + preReadHeaderSize, pread);
@@ -1439,11 +1535,10 @@ public class HFileBlock implements Cacheable {
headerBuf = ByteBuffer.wrap(onDiskBlock, 0, hdrSize);
}
// We know the total on-disk size but not the uncompressed size. Read
- // the entire block into memory, then parse the header and decompress
- // from memory if using compression. Here we have already read the
- // block's header
+ // the entire block into memory, then parse the header. Here we have
+ // already read the block's header
try {
- b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum());
+ b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
} catch (IOException ex) {
// Seen in load testing. Provide comprehensive debug info.
throw new IOException("Failed to read compressed block at "
@@ -1481,66 +1576,34 @@ public class HFileBlock implements Cacheable {
readAtOffset(is, headerBuf.array(), headerBuf.arrayOffset(),
hdrSize, false, offset, pread);
}
- b = new HFileBlock(headerBuf, this.fileContext.isUseHBaseChecksum());
+ b = new HFileBlock(headerBuf, fileContext.isUseHBaseChecksum());
onDiskBlock = new byte[b.getOnDiskSizeWithHeader() + hdrSize];
- System.arraycopy(headerBuf.array(),
- headerBuf.arrayOffset(), onDiskBlock, 0, hdrSize);
+ System.arraycopy(headerBuf.array(), headerBuf.arrayOffset(),
onDiskBlock, 0, hdrSize);
nextBlockOnDiskSize =
readAtOffset(is, onDiskBlock, hdrSize, b.getOnDiskSizeWithHeader()
- hdrSize, true, offset + hdrSize, pread);
onDiskSizeWithHeader = b.onDiskSizeWithoutHeader + hdrSize;
}
- Algorithm compressAlgo = fileContext.getCompression();
- boolean isCompressed =
- compressAlgo != null
- && compressAlgo != Compression.Algorithm.NONE;
-
- Encryption.Context cryptoContext = fileContext.getEncryptionContext();
- boolean isEncrypted = cryptoContext != null
- && cryptoContext != Encryption.Context.NONE;
-
- if (!isCompressed && !isEncrypted) {
+ if (!fileContext.isCompressedOrEncrypted()) {
b.assumeUncompressed();
}
- if (verifyChecksum &&
- !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
+ if (verifyChecksum && !validateBlockChecksum(b, onDiskBlock, hdrSize)) {
return null; // checksum mismatch
}
- if (isCompressed || isEncrypted) {
- // This will allocate a new buffer but keep header bytes.
- b.allocateBuffer(nextBlockOnDiskSize > 0);
- if (b.blockType == BlockType.ENCODED_DATA) {
-
encodedBlockDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(),
- b.getUncompressedSizeWithoutHeader(),
b.getBufferWithoutHeader(), onDiskBlock,
- hdrSize);
- } else {
- defaultDecodingCtx.prepareDecoding(b.getOnDiskSizeWithoutHeader(),
- b.getUncompressedSizeWithoutHeader(),
b.getBufferWithoutHeader(), onDiskBlock,
- hdrSize);
- }
- if (nextBlockOnDiskSize > 0) {
- // Copy next block's header bytes into the new block if we have them.
- System.arraycopy(onDiskBlock, onDiskSizeWithHeader, b.buf.array(),
- b.buf.arrayOffset() + hdrSize
- + b.uncompressedSizeWithoutHeader + b.totalChecksumBytes(),
- hdrSize);
- }
- } else {
- // The onDiskBlock will become the headerAndDataBuffer for this block.
- // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock
already
- // contains the header of next block, so no need to set next
- // block's header in it.
- b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0,
- onDiskSizeWithHeader), this.fileContext.isUseHBaseChecksum());
- }
+ // The onDiskBlock will become the headerAndDataBuffer for this block.
+ // If nextBlockOnDiskSizeWithHeader is not zero, the onDiskBlock already
+ // contains the header of next block, so no need to set next
+ // block's header in it.
+ b = new HFileBlock(ByteBuffer.wrap(onDiskBlock, 0, onDiskSizeWithHeader),
+ this.fileContext.isUseHBaseChecksum());
b.nextBlockOnDiskSizeWithHeader = nextBlockOnDiskSize;
// Set prefetched header
- if (b.nextBlockOnDiskSizeWithHeader > 0) {
+ if (b.hasNextBlockHeader()) {
prefetchedHeader.offset = offset + b.getOnDiskSizeWithHeader();
System.arraycopy(onDiskBlock, onDiskSizeWithHeader,
prefetchedHeader.header, 0, hdrSize);
@@ -1560,37 +1623,53 @@ public class HFileBlock implements Cacheable {
encodedBlockDecodingCtx =
encoder.newDataBlockDecodingContext(this.fileContext);
}
+ @Override
+ public HFileBlockDecodingContext getBlockDecodingContext() {
+ return this.encodedBlockDecodingCtx;
+ }
+
+ @Override
+ public HFileBlockDecodingContext getDefaultBlockDecodingContext() {
+ return this.defaultDecodingCtx;
+ }
+
/**
* Generates the checksum for the header as well as the data and
* then validates that it matches the value stored in the header.
* If there is a checksum mismatch, then return false. Otherwise
* return true.
*/
- protected boolean validateBlockChecksum(HFileBlock block,
- byte[] data, int hdrSize) throws IOException {
- return ChecksumUtil.validateBlockChecksum(path, block,
- data, hdrSize);
+ protected boolean validateBlockChecksum(HFileBlock block, byte[] data,
int hdrSize)
+ throws IOException {
+ return ChecksumUtil.validateBlockChecksum(path, block, data, hdrSize);
}
@Override
public void closeStreams() throws IOException {
streamWrapper.close();
}
+
+ @Override
+ public String toString() {
+ return "FSReaderV2 [ hfs=" + hfs + " path=" + path + " fileContext=" +
fileContext + " ]";
+ }
}
@Override
public int getSerializedLength() {
if (buf != null) {
- return this.buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE;
+ // include extra bytes for the next header when it's available.
+ int extraSpace = hasNextBlockHeader() ? headerSize() : 0;
+ return this.buf.limit() + extraSpace +
HFileBlock.EXTRA_SERIALIZATION_SPACE;
}
return 0;
}
@Override
public void serialize(ByteBuffer destination) {
- ByteBuffer dupBuf = this.buf.duplicate();
- dupBuf.rewind();
- destination.put(dupBuf);
+ // assumes HeapByteBuffer
+ destination.put(this.buf.array(), this.buf.arrayOffset(),
+ getSerializedLength() - EXTRA_SERIALIZATION_SPACE);
serializeExtraInfo(destination);
}
@@ -1638,13 +1717,9 @@ public class HFileBlock implements Cacheable {
if (castedComparison.uncompressedSizeWithoutHeader !=
this.uncompressedSizeWithoutHeader) {
return false;
}
- if (this.buf.compareTo(castedComparison.buf) != 0) {
- return false;
- }
- if (this.buf.position() != castedComparison.buf.position()){
- return false;
- }
- if (this.buf.limit() != castedComparison.buf.limit()){
+ if (Bytes.compareTo(this.buf.array(), this.buf.arrayOffset(),
this.buf.limit(),
+ castedComparison.buf.array(), castedComparison.buf.arrayOffset(),
+ castedComparison.buf.limit()) != 0) {
return false;
}
return true;
@@ -1665,6 +1740,7 @@ public class HFileBlock implements Cacheable {
return this.fileContext.getBytesPerChecksum();
}
+ /** @return the size of data on disk + header. Excludes checksum. */
int getOnDiskDataSizeWithHeader() {
return this.onDiskDataSizeWithHeader;
}
@@ -1718,6 +1794,10 @@ public class HFileBlock implements Cacheable {
return DUMMY_HEADER_NO_CHECKSUM;
}
+ /**
+ * @return the HFileContext used to create this HFileBlock. Not necessary the
+ * fileContext for the file from which this block's data was originally read.
+ */
public HFileContext getHFileContext() {
return this.fileContext;
}
@@ -1730,7 +1810,7 @@ public class HFileBlock implements Cacheable {
static String toStringHeader(ByteBuffer buf) throws IOException {
int offset = buf.arrayOffset();
byte[] b = buf.array();
- long magic = Bytes.toLong(b, offset);
+ long magic = Bytes.toLong(b, offset);
BlockType bt = BlockType.read(buf);
offset += Bytes.SIZEOF_LONG;
int compressedBlockSizeNoHeader = Bytes.toInt(b, offset);
@@ -1757,4 +1837,3 @@ public class HFileBlock implements Cacheable {
" onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
}
}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 2992466..5d20cdf 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -740,7 +740,7 @@ public class HFileBlockIndex {
* {@link #writeIndexBlocks(FSDataOutputStream)} has been called. The
* initial value accounts for the root level, and will be increased to two
* as soon as we find out there is a leaf-level in
- * {@link #blockWritten(long, int)}.
+ * {@link #blockWritten(long, int, int)}.
*/
private int numLevels = 1;
@@ -766,8 +766,8 @@ public class HFileBlockIndex {
/** Whether we require this block index to always be single-level. */
private boolean singleLevelOnly;
- /** Block cache, or null if cache-on-write is disabled */
- private BlockCache blockCache;
+ /** CacheConfig, or null if cache-on-write is disabled */
+ private CacheConfig cacheConf;
/** Name to use for computing cache keys */
private String nameForCaching;
@@ -782,18 +782,17 @@ public class HFileBlockIndex {
* Creates a multi-level block index writer.
*
* @param blockWriter the block writer to use to write index blocks
- * @param blockCache if this is not null, index blocks will be cached
- * on write into this block cache.
+ * @param cacheConf used to determine when and how a block should be
cached-on-write.
*/
public BlockIndexWriter(HFileBlock.Writer blockWriter,
- BlockCache blockCache, String nameForCaching) {
- if ((blockCache == null) != (nameForCaching == null)) {
+ CacheConfig cacheConf, String nameForCaching) {
+ if ((cacheConf == null) != (nameForCaching == null)) {
throw new IllegalArgumentException("Block cache and file name for " +
"caching must be both specified or both null");
}
this.blockWriter = blockWriter;
- this.blockCache = blockCache;
+ this.cacheConf = cacheConf;
this.nameForCaching = nameForCaching;
this.maxChunkSize = HFileBlockIndex.DEFAULT_MAX_CHUNK_SIZE;
}
@@ -947,11 +946,11 @@ public class HFileBlockIndex {
byte[] curFirstKey = curChunk.getBlockKey(0);
blockWriter.writeHeaderAndData(out);
- if (blockCache != null) {
- HFileBlock blockForCaching = blockWriter.getBlockForCaching();
- blockCache.cacheBlock(new BlockCacheKey(nameForCaching,
- beginOffset, DataBlockEncoding.NONE,
- blockForCaching.getBlockType()), blockForCaching);
+ if (cacheConf != null) {
+ HFileBlock blockForCaching = blockWriter.getBlockForCaching(cacheConf);
+ cacheConf.getBlockCache().cacheBlock(new BlockCacheKey(nameForCaching,
+ beginOffset, DataBlockEncoding.NONE,
+ blockForCaching.getBlockType()), blockForCaching);
}
// Add intermediate index block size
@@ -1059,8 +1058,7 @@ public class HFileBlockIndex {
* entry referring to that block to the parent-level index.
*/
@Override
- public void blockWritten(long offset, int onDiskSize, int uncompressedSize)
- {
+ public void blockWritten(long offset, int onDiskSize, int
uncompressedSize) {
// Add leaf index block size
totalBlockOnDiskSize += onDiskSize;
totalBlockUncompressedSize += uncompressedSize;
@@ -1125,7 +1123,7 @@ public class HFileBlockIndex {
*/
@Override
public boolean getCacheOnWrite() {
- return blockCache != null;
+ return cacheConf != null && cacheConf.shouldCacheIndexesOnWrite();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
index 41d5062..12565fc 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
@@ -283,6 +283,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
HFileBlock cachedBlock =
(HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey,
cacheBlock, false, true);
if (cachedBlock != null) {
+ assert cachedBlock.isUnpacked() : "Packed block leak.";
// Return a distinct 'shallow copy' of the block,
// so pos does not get messed by the scanner
return cachedBlock.getBufferWithoutHeader();
@@ -291,7 +292,7 @@ public class HFileReaderV2 extends AbstractHFileReader {
}
HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
- blockSize, -1, true);
+ blockSize, -1, true).unpack(hfileContext, fsBlockReader);
// Cache the block
if (cacheBlock) {
@@ -359,7 +360,10 @@ public class HFileReaderV2 extends AbstractHFileReader {
HFileBlock cachedBlock = (HFileBlock)
cacheConf.getBlockCache().getBlock(cacheKey,
cacheBlock, useLock, updateCacheMetrics);
if (cachedBlock != null) {
- validateBlockType(cachedBlock, expectedBlockType);
+ if
(cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
+ cachedBlock = cachedBlock.unpack(hfileContext, fsBlockReader);
+ }
+ assert cachedBlock.isUnpacked() : "Packed block leak.";
if (cachedBlock.getBlockType().isData()) {
HFile.dataBlockReadCnt.incrementAndGet();
@@ -387,17 +391,21 @@ public class HFileReaderV2 extends AbstractHFileReader {
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset,
onDiskBlockSize, -1,
pread);
validateBlockType(hfileBlock, expectedBlockType);
+ HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
+ BlockType.BlockCategory category =
hfileBlock.getBlockType().getCategory();
// Cache the block if necessary
- if (cacheBlock &&
cacheConf.shouldCacheBlockOnRead(hfileBlock.getBlockType().getCategory())) {
- cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
cacheConf.isInMemory());
+ if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
+ cacheConf.getBlockCache().cacheBlock(cacheKey,
+ cacheConf.shouldCacheCompressed(category) ? hfileBlock : unpacked,
+ cacheConf.isInMemory());
}
if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
HFile.dataBlockReadCnt.incrementAndGet();
}
- return hfileBlock;
+ return unpacked;
}
} finally {
traceScope.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
index fdea542..21f0147 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
@@ -119,7 +119,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
- cacheIndexesOnWrite ? cacheConf.getBlockCache(): null,
+ cacheIndexesOnWrite ? cacheConf : null,
cacheIndexesOnWrite ? name : null);
dataBlockIndexWriter.setMaxChunkSize(
HFileBlockIndex.getMaxChunkSize(conf));
@@ -144,7 +144,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
newBlock();
}
- /** Clean up the current block */
+ /** Clean up the current data block */
private void finishBlock() throws IOException {
if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)
return;
@@ -192,7 +192,7 @@ public class HFileWriterV2 extends AbstractHFileWriter {
* the cache key.
*/
private void doCacheOnWrite(long offset) {
- HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching();
+ HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(cacheConf);
cacheConf.getBlockCache().cacheBlock(
new BlockCacheKey(name, offset, blockEncoder.getDataBlockEncoding(),
cacheFormatBlock.getBlockType()), cacheFormatBlock);
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index bc3f8d3..a5fd71d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.base.Objects;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -325,12 +326,35 @@ public class LruBlockCache implements BlockCache,
HeapSize {
cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
long newSize = updateSizeMetrics(cb, false);
map.put(cacheKey, cb);
- elements.incrementAndGet();
- if(newSize > acceptableSize() && !evictionInProgress) {
+ long val = elements.incrementAndGet();
+ if (LOG.isTraceEnabled()) {
+ long size = map.size();
+ assertCounterSanity(size, val);
+ }
+ if (newSize > acceptableSize() && !evictionInProgress) {
runEviction();
}
}
+ /**
+ * Sanity-checking for parity between actual block cache content and metrics.
+ * Intended only for use with TRACE level logging and -ea JVM.
+ */
+ private static void assertCounterSanity(long mapSize, long counterVal) {
+ if (counterVal < 0) {
+ LOG.trace("counterVal overflow. Assertions unreliable. counterVal=" +
counterVal +
+ ", mapSize=" + mapSize);
+ return;
+ }
+ if (mapSize < Integer.MAX_VALUE) {
+ double pct_diff = Math.abs((((double) counterVal) / ((double) mapSize))
- 1.);
+ if (pct_diff > 0.05) {
+ LOG.trace("delta between reported and actual size > 5%. counterVal=" +
counterVal +
+ ", mapSize=" + mapSize);
+ }
+ }
+ }
+
private int compare(Cacheable left, Cacheable right) {
ByteBuffer l = ByteBuffer.allocate(left.getSerializedLength());
left.serialize(l);
@@ -447,7 +471,11 @@ public class LruBlockCache implements BlockCache, HeapSize
{
protected long evictBlock(LruCachedBlock block, boolean
evictedByEvictionProcess) {
map.remove(block.getCacheKey());
updateSizeMetrics(block, true);
- elements.decrementAndGet();
+ long val = elements.decrementAndGet();
+ if (LOG.isTraceEnabled()) {
+ long size = map.size();
+ assertCounterSanity(size, val);
+ }
stats.evicted();
if (evictedByEvictionProcess && victimHandler != null) {
boolean wait = getCurrentSize() < acceptableSize();
@@ -491,9 +519,12 @@ public class LruBlockCache implements BlockCache, HeapSize
{
if(bytesToFree <= 0) return;
// Instantiate priority buckets
- BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
singleSize());
- BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
multiSize());
- BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
memorySize());
+ BlockBucket bucketSingle = new BlockBucket("single", bytesToFree,
blockSize,
+ singleSize());
+ BlockBucket bucketMulti = new BlockBucket("multi", bytesToFree,
blockSize,
+ multiSize());
+ BlockBucket bucketMemory = new BlockBucket("memory", bytesToFree,
blockSize,
+ memorySize());
// Scan entire map putting into appropriate buckets
for(LruCachedBlock cachedBlock : map.values()) {
@@ -522,7 +553,15 @@ public class LruBlockCache implements BlockCache, HeapSize
{
// so the single and multi buckets will be emptied
bytesFreed = bucketSingle.free(s);
bytesFreed += bucketMulti.free(m);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
+ " from single and multi buckets");
+ }
bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("freed " + StringUtils.byteDesc(bytesFreed) +
+ " total from all three buckets ");
+ }
} else {
// this means no need to evict block in memory bucket,
// and we try best to make the ratio between single-bucket and
@@ -584,6 +623,23 @@ public class LruBlockCache implements BlockCache, HeapSize
{
}
}
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("blockCount", getBlockCount())
+ .add("currentSize", getCurrentSize())
+ .add("freeSize", getFreeSize())
+ .add("maxSize", getMaxSize())
+ .add("heapSize", heapSize())
+ .add("minSize", minSize())
+ .add("minFactor", minFactor)
+ .add("multiSize", multiSize())
+ .add("multiFactor", multiFactor)
+ .add("singleSize", singleSize())
+ .add("singleFactor", singleFactor)
+ .toString();
+ }
+
/**
* Used to group blocks into priority buckets. There will be a BlockBucket
* for each priority (single, multi, memory). Once bucketed, the eviction
@@ -591,11 +647,14 @@ public class LruBlockCache implements BlockCache,
HeapSize {
* to configuration parameters and their relatives sizes.
*/
private class BlockBucket implements Comparable<BlockBucket> {
+
+ private final String name;
private LruCachedBlockQueue queue;
private long totalSize = 0;
private long bucketSize;
- public BlockBucket(long bytesToFree, long blockSize, long bucketSize) {
+ public BlockBucket(String name, long bytesToFree, long blockSize, long
bucketSize) {
+ this.name = name;
this.bucketSize = bucketSize;
queue = new LruCachedBlockQueue(bytesToFree, blockSize);
totalSize = 0;
@@ -607,6 +666,9 @@ public class LruBlockCache implements BlockCache, HeapSize {
}
public long free(long toFree) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("freeing " + StringUtils.byteDesc(toFree) + " from " + this);
+ }
LruCachedBlock cb;
long freedBytes = 0;
while ((cb = queue.pollLast()) != null) {
@@ -615,6 +677,9 @@ public class LruBlockCache implements BlockCache, HeapSize {
return freedBytes;
}
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("freed " + StringUtils.byteDesc(freedBytes) + " from " +
this);
+ }
return freedBytes;
}
@@ -636,13 +701,22 @@ public class LruBlockCache implements BlockCache,
HeapSize {
if (that == null || !(that instanceof BlockBucket)){
return false;
}
+
return compareTo((BlockBucket)that) == 0;
}
@Override
public int hashCode() {
- // Nothing distingushing about each instance unless I pass in a 'name'
or something
- return super.hashCode();
+ return Objects.hashCode(name, bucketSize, queue, totalSize);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("name", name)
+ .add("totalSize", StringUtils.byteDesc(totalSize))
+ .add("bucketSize", StringUtils.byteDesc(bucketSize))
+ .toString();
}
}
@@ -757,7 +831,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
LruBlockCache.LOG.debug("Total=" + StringUtils.byteDesc(totalSize) + ", " +
"free=" + StringUtils.byteDesc(freeSize) + ", " +
"max=" + StringUtils.byteDesc(this.maxSize) + ", " +
- "blocks=" + size() +", " +
+ "blockCount=" + getBlockCount() + ", " +
"accesses=" + stats.getRequestCount() + ", " +
"hits=" + stats.getHitCount() + ", " +
"hitRatio=" +
@@ -920,6 +994,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
}
/** Clears the cache. Used in tests. */
+ @VisibleForTesting
public void clearCache() {
map.clear();
}
@@ -928,6 +1003,7 @@ public class LruBlockCache implements BlockCache, HeapSize
{
* Used in testing. May be very inefficient.
* @return the set of cached file names
*/
+ @VisibleForTesting
SortedSet<String> getCachedFileNamesForTest() {
SortedSet<String> fileNames = new TreeSet<String>();
for (BlockCacheKey cacheKey : map.keySet()) {
@@ -948,6 +1024,7 @@ public class LruBlockCache implements BlockCache, HeapSize
{
return counts;
}
+ @VisibleForTesting
public Map<DataBlockEncoding, Integer> getEncodingCountsForTest() {
Map<DataBlockEncoding, Integer> counts =
new EnumMap<DataBlockEncoding, Integer>(DataBlockEncoding.class);
@@ -964,6 +1041,11 @@ public class LruBlockCache implements BlockCache,
HeapSize {
victimHandler = handler;
}
+ @VisibleForTesting
+ Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
+ return map;
+ }
+
@Override
public BlockCache[] getBlockCaches() {
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index 1bac569..c0e5ade 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.io.hfile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
@@ -84,6 +85,7 @@ public class TestCacheOnWrite {
private final Compression.Algorithm compress;
private final BlockEncoderTestType encoderType;
private final HFileDataBlockEncoder encoder;
+ private final boolean cacheCompressedData;
private static final int DATA_BLOCK_SIZE = 2048;
private static final int NUM_KV = 25000;
@@ -154,14 +156,15 @@ public class TestCacheOnWrite {
}
}
- public TestCacheOnWrite(CacheOnWriteType cowType,
- Compression.Algorithm compress, BlockEncoderTestType encoderType) {
+ public TestCacheOnWrite(CacheOnWriteType cowType, Compression.Algorithm
compress,
+ BlockEncoderTestType encoderType, boolean cacheCompressedData) {
this.cowType = cowType;
this.compress = compress;
this.encoderType = encoderType;
this.encoder = encoderType.getEncoder();
- testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
- ", encoderType=" + encoderType + "]";
+ this.cacheCompressedData = cacheCompressedData;
+ testDescription = "[cacheOnWrite=" + cowType + ", compress=" + compress +
+ ", encoderType=" + encoderType + ", cacheCompressedData=" +
cacheCompressedData + "]";
System.out.println(testDescription);
}
@@ -173,7 +176,9 @@ public class TestCacheOnWrite {
HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
for (BlockEncoderTestType encoderType :
BlockEncoderTestType.values()) {
- cowTypes.add(new Object[] { cowType, compress, encoderType });
+ for (boolean cacheCompressedData : new boolean[] { false, true }) {
+ cowTypes.add(new Object[] { cowType, compress, encoderType,
cacheCompressedData });
+ }
}
}
}
@@ -189,11 +194,12 @@ public class TestCacheOnWrite {
conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
BLOOM_BLOCK_SIZE);
conf.setBoolean(CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY,
- cowType.shouldBeCached(BlockType.DATA));
+ cowType.shouldBeCached(BlockType.DATA));
conf.setBoolean(CacheConfig.CACHE_INDEX_BLOCKS_ON_WRITE_KEY,
cowType.shouldBeCached(BlockType.LEAF_INDEX));
conf.setBoolean(CacheConfig.CACHE_BLOOM_BLOCKS_ON_WRITE_KEY,
cowType.shouldBeCached(BlockType.BLOOM_CHUNK));
+ conf.setBoolean(CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY,
cacheCompressedData);
cowType.modifyConf(conf);
fs = HFileSystem.get(conf);
cacheConf = new CacheConfig(conf);
@@ -225,6 +231,10 @@ public class TestCacheOnWrite {
reader = (HFileReaderV2) HFile.createReader(fs, storeFilePath,
cacheConf, conf);
}
LOG.info("HFile information: " + reader);
+ HFileContext meta = new HFileContextBuilder().withCompression(compress)
+ .withBytesPerCheckSum(CKBYTES).withChecksumType(ChecksumType.NULL)
+
.withBlockSize(DATA_BLOCK_SIZE).withDataBlockEncoding(encoder.getDataBlockEncoding())
+ .withIncludesTags(useTags).build();
final boolean cacheBlocks = false;
final boolean pread = false;
HFileScanner scanner = reader.getScanner(cacheBlocks, pread);
@@ -247,17 +257,37 @@ public class TestCacheOnWrite {
HFileBlock block = reader.readBlock(offset, onDiskSize, false, true,
false, true, null);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
- offset, encodingInCache, block.getBlockType());
- boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true)
!= null;
+ offset, encodingInCache, block.getBlockType());
+ HFileBlock fromCache = (HFileBlock) blockCache.getBlock(blockCacheKey,
true, false, true);
+ boolean isCached = fromCache != null;
boolean shouldBeCached = cowType.shouldBeCached(block.getBlockType());
- if (shouldBeCached != isCached) {
- throw new AssertionError(
- "shouldBeCached: " + shouldBeCached+ "\n" +
- "isCached: " + isCached + "\n" +
- "Test description: " + testDescription + "\n" +
- "block: " + block + "\n" +
- "encodingInCache: " + encodingInCache + "\n" +
- "blockCacheKey: " + blockCacheKey);
+ assertTrue("shouldBeCached: " + shouldBeCached+ "\n" +
+ "isCached: " + isCached + "\n" +
+ "Test description: " + testDescription + "\n" +
+ "block: " + block + "\n" +
+ "encodingInCache: " + encodingInCache + "\n" +
+ "blockCacheKey: " + blockCacheKey,
+ shouldBeCached == isCached);
+ if (isCached) {
+ if
(cacheConf.shouldCacheCompressed(fromCache.getBlockType().getCategory())) {
+ if (compress != Compression.Algorithm.NONE) {
+ assertFalse(fromCache.isUnpacked());
+ }
+ fromCache = fromCache.unpack(meta, reader.getUncachedBlockReader());
+ } else {
+ assertTrue(fromCache.isUnpacked());
+ }
+ // block we cached at write-time and block read from file should be
identical
+ assertEquals(block.getChecksumType(), fromCache.getChecksumType());
+ assertEquals(block.getBlockType(), fromCache.getBlockType());
+ if (block.getBlockType() == BlockType.ENCODED_DATA) {
+ assertEquals(block.getDataBlockEncodingId(),
fromCache.getDataBlockEncodingId());
+ assertEquals(block.getDataBlockEncoding(),
fromCache.getDataBlockEncoding());
+ }
+ assertEquals(block.getOnDiskSizeWithHeader(),
fromCache.getOnDiskSizeWithHeader());
+ assertEquals(block.getOnDiskSizeWithoutHeader(),
fromCache.getOnDiskSizeWithoutHeader());
+ assertEquals(
+ block.getUncompressedSizeWithoutHeader(),
fromCache.getUncompressedSizeWithoutHeader());
}
prevBlock = block;
offset += block.getOnDiskSizeWithHeader();
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index 020a293..f70976f 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -125,7 +125,7 @@ public class TestChecksum {
assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
// read data back from the hfile, exclude header and checksum
- ByteBuffer bb = b.getBufferWithoutHeader(); // read back data
+ ByteBuffer bb = b.unpack(meta, hbr).getBufferWithoutHeader(); // read
back data
DataInputStream in = new DataInputStream(
new ByteArrayInputStream(
bb.array(), bb.arrayOffset(), bb.limit()));
@@ -164,6 +164,7 @@ public class TestChecksum {
b = hbr.readBlockData(0, -1, -1, pread);
is.close();
b.sanityCheck();
+ b = b.unpack(meta, hbr);
assertEquals(4936, b.getUncompressedSizeWithoutHeader());
assertEquals(algo == GZ ? 2173 : 4936,
b.getOnDiskSizeWithoutHeader() - b.totalChecksumBytes());
@@ -274,12 +275,7 @@ public class TestChecksum {
// validate data
for (int i = 0; i < 1234; i++) {
int val = in.readInt();
- if (val != i) {
- String msg = "testChecksumCorruption: data mismatch at index " +
- i + " expected " + i + " found " + val;
- LOG.warn(msg);
- assertEquals(i, val);
- }
+ assertEquals("testChecksumCorruption: data mismatch at index " + i, i,
val);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
index 665a7f3..d7eb0c4 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestForceCacheImportantBlocks.java
@@ -74,14 +74,15 @@ public class TestForceCacheImportantBlocks {
@Parameters
public static Collection<Object[]> parameters() {
// HFile versions
- return Arrays.asList(new Object[][] {
- new Object[] { new Integer(2), false },
- new Object[] { new Integer(2), true }
- });
+ return Arrays.asList(
+ new Object[] { 2, true },
+ new Object[] { 2, false },
+ new Object[] { 3, true },
+ new Object[] { 3, false }
+ );
}
- public TestForceCacheImportantBlocks(int hfileVersion,
- boolean cfCacheEnabled) {
+ public TestForceCacheImportantBlocks(int hfileVersion, boolean
cfCacheEnabled) {
this.hfileVersion = hfileVersion;
this.cfCacheEnabled = cfCacheEnabled;
TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY,
@@ -119,7 +120,6 @@ public class TestForceCacheImportantBlocks {
}
}
-
private void writeTestData(HRegion region) throws IOException {
for (int i = 0; i < NUM_ROWS; ++i) {
Put put = new Put(Bytes.toBytes("row" + i));
@@ -135,6 +135,4 @@ public class TestForceCacheImportantBlocks {
}
}
}
-
}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8851309/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
----------------------------------------------------------------------
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index 20463f7..f46bf13 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -301,7 +301,8 @@ public class TestHFile extends HBaseTestCase {
ByteBuffer actual = reader.getMetaBlock("HFileMeta" + i, false);
ByteBuffer expected =
ByteBuffer.wrap(("something to test" + i).getBytes());
- assertTrue("failed to match metadata", actual.compareTo(expected) == 0);
+ assertEquals("failed to match metadata",
+ Bytes.toStringBinary(expected), Bytes.toStringBinary(actual));
}
}