This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new e25b2a7cd0b HBASE-27370 Avoid decompressing blocks when reading from
bucket cache… (#4781)
e25b2a7cd0b is described below
commit e25b2a7cd0b0daddad5ab09a87cd7ed9f59c81b6
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Tue Sep 20 09:15:17 2022 +0100
HBASE-27370 Avoid decompressing blocks when reading from bucket cache…
(#4781)
Co-authored-by: Josh Elser <[email protected]>
Signed-off-by: Peter Somogyi <[email protected]>
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
---
.../org/apache/hadoop/hbase/io/hfile/HFile.java | 4 ++
.../hadoop/hbase/io/hfile/HFilePreadReader.java | 2 +-
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 44 +++++++++---
.../hadoop/hbase/io/hfile/TestHFileBlockIndex.java | 8 +++
.../apache/hadoop/hbase/io/hfile/TestPrefetch.java | 83 ++++++++++++++++++----
5 files changed, 118 insertions(+), 23 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index f85636ec448..d18194e95c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -373,6 +373,10 @@ public final class HFile {
HFileBlock readBlock(long offset, long onDiskBlockSize, boolean
cacheBlock, final boolean pread,
final boolean isCompaction, final boolean updateCacheMetrics, BlockType
expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException;
+
+ HFileBlock readBlock(long offset, long onDiskBlockSize, boolean
cacheBlock, final boolean pread,
+ final boolean isCompaction, final boolean updateCacheMetrics, BlockType
expectedBlockType,
+ DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws
IOException;
}
/** An interface used by clients to open and iterate an {@link HFile}. */
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 98401c46bee..0eb2aa7db00 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -57,7 +57,7 @@ public class HFilePreadReader extends HFileReaderImpl {
// next header, will not have happened...so, pass in the
onDiskSize gotten from the
// cached block. This 'optimization' triggers extremely rarely
I'd say.
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /*
cacheBlock= */true,
- /* pread= */true, false, false, null, null);
+ /* pread= */true, false, false, null, null, true);
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 4deee6e029d..e64bdd8a59d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1082,7 +1082,7 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
* and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block
as necessary.
*/
private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean
cacheBlock, boolean useLock,
- boolean isCompaction, boolean updateCacheMetrics, BlockType
expectedBlockType,
+ boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
// Check cache for block. If found return.
BlockCache cache = cacheConf.getBlockCache().orElse(null);
@@ -1187,7 +1187,7 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
cacheBlock &=
cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
HFileBlock cachedBlock =
- getCachedBlock(cacheKey, cacheBlock, false, true, true,
BlockType.META, null);
+ getCachedBlock(cacheKey, cacheBlock, false, true, BlockType.META,
null);
if (cachedBlock != null) {
assert cachedBlock.isUnpacked() : "Packed block leak.";
// Return a distinct 'shallow copy' of the block,
@@ -1234,6 +1234,15 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
final boolean cacheBlock,
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding)
throws IOException {
+ return readBlock(dataBlockOffset, onDiskBlockSize, cacheBlock, pread,
isCompaction,
+ updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding, false);
+ }
+
+ @Override
+ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
final boolean cacheBlock,
+ boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
+ BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding,
boolean cacheOnly)
+ throws IOException {
if (dataBlockIndexReader == null) {
throw new IOException(path + " block index not loaded");
}
@@ -1257,17 +1266,18 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
try (TraceScope traceScope =
TraceUtil.createTrace("HFileReaderImpl.readBlock")) {
while (true) {
// Check cache for block. If found return.
- if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
+ if (cacheConf.shouldReadBlockFromCache(expectedBlockType) &&
!cacheOnly) {
if (useLock) {
lockEntry = offsetLock.getLockEntry(dataBlockOffset);
}
// Try and get the block from the block cache. If the useLock
variable is true then this
// is the second time through the loop and it should not be counted
as a block cache miss.
- HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock,
useLock, isCompaction,
- updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
+ HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock,
useLock, updateCacheMetrics,
+ expectedBlockType, expectedDataBlockEncoding);
if (cachedBlock != null) {
if (LOG.isTraceEnabled()) {
- LOG.trace("From Cache " + cachedBlock);
+ LOG.trace("Block for file {} is coming from Cache {}",
+ Bytes.toString(cachedBlock.getHFileContext().getTableName()),
cachedBlock);
}
TraceUtil.addTimelineAnnotation("blockCacheHit");
assert cachedBlock.isUnpacked() : "Packed block leak.";
@@ -1304,14 +1314,30 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset,
onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType));
validateBlockType(hfileBlock, expectedBlockType);
- HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
BlockType.BlockCategory category =
hfileBlock.getBlockType().getCategory();
+ final boolean cacheCompressed =
cacheConf.shouldCacheCompressed(category);
+ final boolean cacheOnRead = cacheConf.shouldCacheBlockOnRead(category);
+
+ // Don't need the unpacked block back and we're storing the block in
the cache compressed
+ if (cacheOnly && cacheCompressed && cacheOnRead) {
+ LOG.debug("Skipping decompression of block in prefetch");
+ // Cache the block if necessary
+ cacheConf.getBlockCache().ifPresent(cache -> {
+ if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
+ cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
+ }
+ });
+ if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
+ HFile.DATABLOCK_READ_COUNT.increment();
+ }
+ return hfileBlock;
+ }
+ HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
- cache.cacheBlock(cacheKey,
- cacheConf.shouldCacheCompressed(category) ? hfileBlock :
unpacked,
+ cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory());
}
});
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 001099df664..031ede45dcf 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -176,6 +176,14 @@ public class TestHFileBlockIndex {
public HFileBlock readBlock(long offset, long onDiskSize, boolean
cacheBlock, boolean pread,
boolean isCompaction, boolean updateCacheMetrics, BlockType
expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
+ return readBlock(offset, onDiskSize, cacheBlock, pread, isCompaction,
updateCacheMetrics,
+ expectedBlockType, expectedDataBlockEncoding, false);
+ }
+
+ @Override
+ public HFileBlock readBlock(long offset, long onDiskSize, boolean
cacheBlock, boolean pread,
+ boolean isCompaction, boolean updateCacheMetrics, BlockType
expectedBlockType,
+ DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws
IOException {
if (offset == prevOffset && onDiskSize == prevOnDiskSize && pread ==
prevPread) {
hitCount += 1;
return prevBlock;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 0657799a5f9..ee9fd5b7803 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static
org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -34,6 +38,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -112,6 +117,51 @@ public class TestPrefetch {
}
private void readStoreFile(Path storeFilePath) throws Exception {
+ readStoreFile(storeFilePath, (r, o) -> {
+ HFileBlock block = null;
+ try {
+ block = r.readBlock(o, -1, false, true, false, true, null, null);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ return block;
+ }, (key, block) -> {
+ boolean isCached = blockCache.getBlock(key, true, false, true) != null;
+ if (
+ block.getBlockType() == BlockType.DATA || block.getBlockType() ==
BlockType.ROOT_INDEX
+ || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
+ ) {
+ assertTrue(isCached);
+ }
+ });
+ }
+
+ private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
+ readStoreFile(storeFilePath, (r, o) -> {
+ HFileBlock block = null;
+ try {
+ block = r.readBlock(o, -1, false, true, false, true, null, null, true);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ return block;
+ }, (key, block) -> {
+ boolean isCached = blockCache.getBlock(key, true, false, true) != null;
+ if (block.getBlockType() == BlockType.DATA) {
+ assertFalse(block.isUnpacked());
+ } else if (
+ block.getBlockType() == BlockType.ROOT_INDEX
+ || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
+ ) {
+ assertTrue(block.isUnpacked());
+ }
+ assertTrue(isCached);
+ });
+ }
+
+ private void readStoreFile(Path storeFilePath,
+ BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
+ BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception
{
// Open the file
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf,
true, conf);
@@ -119,29 +169,36 @@ public class TestPrefetch {
// Sleep for a bit
Thread.sleep(1000);
}
-
- // Check that all of the data blocks were preloaded
- BlockCache blockCache = cacheConf.getBlockCache().get();
long offset = 0;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
- HFileBlock block = reader.readBlock(offset, -1, false, true, false,
true, null, null);
+ HFileBlock block = readFunction.apply(reader, offset);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
offset);
- boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true)
!= null;
- if (
- block.getBlockType() == BlockType.DATA || block.getBlockType() ==
BlockType.ROOT_INDEX
- || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
- ) {
- assertTrue(isCached);
- }
+ validationFunction.accept(blockCacheKey, block);
offset += block.getOnDiskSizeWithHeader();
}
}
+ @Test
+ public void testPrefetchCompressed() throws Exception {
+ conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
+ cacheConf = new CacheConfig(conf, blockCache);
+ HFileContext context = new
HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
+ .withBlockSize(DATA_BLOCK_SIZE).build();
+ Path storeFile = writeStoreFile("TestPrefetchCompressed", context);
+ readStoreFileCacheOnly(storeFile);
+ conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
+
+ }
+
private Path writeStoreFile(String fname) throws IOException {
- Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
HFileContext meta = new
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
+ return writeStoreFile(fname, meta);
+ }
+
+ private Path writeStoreFile(String fname, HFileContext context) throws
IOException {
+ Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
- .withOutputDir(storeFileParentDir).withFileContext(meta).build();
+ .withOutputDir(storeFileParentDir).withFileContext(context).build();
Random rand = ThreadLocalRandom.current();
final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) {