This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new f8c0c35fce0 HBASE-27370 Avoid decompressing blocks when reading from
bucket cache… (#4781)
f8c0c35fce0 is described below
commit f8c0c35fce0e710491035001d2622173eaf4f897
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Tue Sep 20 09:15:17 2022 +0100
HBASE-27370 Avoid decompressing blocks when reading from bucket cache…
(#4781)
Co-authored-by: Josh Elser <[email protected]>
Signed-off-by: Peter Somogyi <[email protected]>
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
---
.../org/apache/hadoop/hbase/io/hfile/HFile.java | 4 ++
.../hadoop/hbase/io/hfile/HFilePreadReader.java | 2 +-
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 44 +++++++++---
.../hadoop/hbase/io/hfile/TestHFileBlockIndex.java | 8 +++
.../apache/hadoop/hbase/io/hfile/TestPrefetch.java | 83 ++++++++++++++++++----
5 files changed, 118 insertions(+), 23 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 89a407b3389..73346e8ae4a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -373,6 +373,10 @@ public final class HFile {
HFileBlock readBlock(long offset, long onDiskBlockSize, boolean
cacheBlock, final boolean pread,
final boolean isCompaction, final boolean updateCacheMetrics, BlockType
expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException;
+
+ HFileBlock readBlock(long offset, long onDiskBlockSize, boolean
cacheBlock, final boolean pread,
+ final boolean isCompaction, final boolean updateCacheMetrics, BlockType
expectedBlockType,
+ DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws
IOException;
}
/** An interface used by clients to open and iterate an {@link HFile}. */
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 98401c46bee..0eb2aa7db00 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -57,7 +57,7 @@ public class HFilePreadReader extends HFileReaderImpl {
// next header, will not have happened...so, pass in the
onDiskSize gotten from the
// cached block. This 'optimization' triggers extremely rarely
I'd say.
HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /*
cacheBlock= */true,
- /* pread= */true, false, false, null, null);
+ /* pread= */true, false, false, null, null, true);
try {
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index 63d1cee2b13..2cf1c3df677 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1084,7 +1084,7 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
* and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block
as necessary.
*/
private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean
cacheBlock, boolean useLock,
- boolean isCompaction, boolean updateCacheMetrics, BlockType
expectedBlockType,
+ boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
// Check cache for block. If found return.
BlockCache cache = cacheConf.getBlockCache().orElse(null);
@@ -1189,7 +1189,7 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
cacheBlock &=
cacheConf.shouldCacheBlockOnRead(BlockType.META.getCategory());
HFileBlock cachedBlock =
- getCachedBlock(cacheKey, cacheBlock, false, true, true,
BlockType.META, null);
+ getCachedBlock(cacheKey, cacheBlock, false, true, BlockType.META,
null);
if (cachedBlock != null) {
assert cachedBlock.isUnpacked() : "Packed block leak.";
// Return a distinct 'shallow copy' of the block,
@@ -1236,6 +1236,15 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
final boolean cacheBlock,
boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding)
throws IOException {
+ return readBlock(dataBlockOffset, onDiskBlockSize, cacheBlock, pread,
isCompaction,
+ updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding, false);
+ }
+
+ @Override
+ public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
final boolean cacheBlock,
+ boolean pread, final boolean isCompaction, boolean updateCacheMetrics,
+ BlockType expectedBlockType, DataBlockEncoding expectedDataBlockEncoding,
boolean cacheOnly)
+ throws IOException {
if (dataBlockIndexReader == null) {
throw new IOException(path + " block index not loaded");
}
@@ -1261,17 +1270,18 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
try {
while (true) {
// Check cache for block. If found return.
- if (cacheConf.shouldReadBlockFromCache(expectedBlockType)) {
+ if (cacheConf.shouldReadBlockFromCache(expectedBlockType) &&
!cacheOnly) {
if (useLock) {
lockEntry = offsetLock.getLockEntry(dataBlockOffset);
}
// Try and get the block from the block cache. If the useLock
variable is true then this
// is the second time through the loop and it should not be counted
as a block cache miss.
- HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock,
useLock, isCompaction,
- updateCacheMetrics, expectedBlockType, expectedDataBlockEncoding);
+ HFileBlock cachedBlock = getCachedBlock(cacheKey, cacheBlock,
useLock, updateCacheMetrics,
+ expectedBlockType, expectedDataBlockEncoding);
if (cachedBlock != null) {
if (LOG.isTraceEnabled()) {
- LOG.trace("From Cache {}", cachedBlock);
+ LOG.trace("Block for file {} is coming from Cache {}",
+ Bytes.toString(cachedBlock.getHFileContext().getTableName()),
cachedBlock);
}
span.addEvent("block cache hit", attributes);
assert cachedBlock.isUnpacked() : "Packed block leak.";
@@ -1308,14 +1318,30 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset,
onDiskBlockSize, pread,
!isCompaction, shouldUseHeap(expectedBlockType));
validateBlockType(hfileBlock, expectedBlockType);
- HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
BlockType.BlockCategory category =
hfileBlock.getBlockType().getCategory();
+ final boolean cacheCompressed =
cacheConf.shouldCacheCompressed(category);
+ final boolean cacheOnRead = cacheConf.shouldCacheBlockOnRead(category);
+
+ // Don't need the unpacked block back and we're storing the block in
the cache compressed
+ if (cacheOnly && cacheCompressed && cacheOnRead) {
+ LOG.debug("Skipping decompression of block in prefetch");
+ // Cache the block if necessary
+ cacheConf.getBlockCache().ifPresent(cache -> {
+ if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
+ cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory());
+ }
+ });
+ if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
+ HFile.DATABLOCK_READ_COUNT.increment();
+ }
+ return hfileBlock;
+ }
+ HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
// Cache the block if necessary
cacheConf.getBlockCache().ifPresent(cache -> {
if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
- cache.cacheBlock(cacheKey,
- cacheConf.shouldCacheCompressed(category) ? hfileBlock :
unpacked,
+ cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
cacheConf.isInMemory());
}
});
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 4aedfb959a9..2009c97ab55 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -175,6 +175,14 @@ public class TestHFileBlockIndex {
public HFileBlock readBlock(long offset, long onDiskSize, boolean
cacheBlock, boolean pread,
boolean isCompaction, boolean updateCacheMetrics, BlockType
expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
+ return readBlock(offset, onDiskSize, cacheBlock, pread, isCompaction,
updateCacheMetrics,
+ expectedBlockType, expectedDataBlockEncoding, false);
+ }
+
+ @Override
+ public HFileBlock readBlock(long offset, long onDiskSize, boolean
cacheBlock, boolean pread,
+ boolean isCompaction, boolean updateCacheMetrics, BlockType
expectedBlockType,
+ DataBlockEncoding expectedDataBlockEncoding, boolean cacheOnly) throws
IOException {
if (offset == prevOffset && onDiskSize == prevOnDiskSize && pread ==
prevPread) {
hitCount += 1;
return prevBlock;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 1e4f675b238..9844ebbf42f 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.io.hfile;
import static
org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static
org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId;
+import static
org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
@@ -26,6 +27,7 @@ import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
@@ -34,6 +36,8 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -47,6 +51,7 @@ import
org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.trace.StringTraceRenderer;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -148,6 +153,51 @@ public class TestPrefetch {
}
private void readStoreFile(Path storeFilePath) throws Exception {
+ readStoreFile(storeFilePath, (r, o) -> {
+ HFileBlock block = null;
+ try {
+ block = r.readBlock(o, -1, false, true, false, true, null, null);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ return block;
+ }, (key, block) -> {
+ boolean isCached = blockCache.getBlock(key, true, false, true) != null;
+ if (
+ block.getBlockType() == BlockType.DATA || block.getBlockType() ==
BlockType.ROOT_INDEX
+ || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
+ ) {
+ assertTrue(isCached);
+ }
+ });
+ }
+
+ private void readStoreFileCacheOnly(Path storeFilePath) throws Exception {
+ readStoreFile(storeFilePath, (r, o) -> {
+ HFileBlock block = null;
+ try {
+ block = r.readBlock(o, -1, false, true, false, true, null, null, true);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ return block;
+ }, (key, block) -> {
+ boolean isCached = blockCache.getBlock(key, true, false, true) != null;
+ if (block.getBlockType() == BlockType.DATA) {
+ assertFalse(block.isUnpacked());
+ } else if (
+ block.getBlockType() == BlockType.ROOT_INDEX
+ || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
+ ) {
+ assertTrue(block.isUnpacked());
+ }
+ assertTrue(isCached);
+ });
+ }
+
+ private void readStoreFile(Path storeFilePath,
+ BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
+ BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception
{
// Open the file
HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf,
true, conf);
@@ -155,29 +205,36 @@ public class TestPrefetch {
// Sleep for a bit
Thread.sleep(1000);
}
-
- // Check that all of the data blocks were preloaded
- BlockCache blockCache = cacheConf.getBlockCache().get();
long offset = 0;
while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
- HFileBlock block = reader.readBlock(offset, -1, false, true, false,
true, null, null);
+ HFileBlock block = readFunction.apply(reader, offset);
BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
offset);
- boolean isCached = blockCache.getBlock(blockCacheKey, true, false, true)
!= null;
- if (
- block.getBlockType() == BlockType.DATA || block.getBlockType() ==
BlockType.ROOT_INDEX
- || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
- ) {
- assertTrue(isCached);
- }
+ validationFunction.accept(blockCacheKey, block);
offset += block.getOnDiskSizeWithHeader();
}
}
+ @Test
+ public void testPrefetchCompressed() throws Exception {
+ conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, true);
+ cacheConf = new CacheConfig(conf, blockCache);
+ HFileContext context = new
HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
+ .withBlockSize(DATA_BLOCK_SIZE).build();
+ Path storeFile = writeStoreFile("TestPrefetchCompressed", context);
+ readStoreFileCacheOnly(storeFile);
+ conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
+
+ }
+
private Path writeStoreFile(String fname) throws IOException {
- Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
HFileContext meta = new
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
+ return writeStoreFile(fname, meta);
+ }
+
+ private Path writeStoreFile(String fname, HFileContext context) throws
IOException {
+ Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
- .withOutputDir(storeFileParentDir).withFileContext(meta).build();
+ .withOutputDir(storeFileParentDir).withFileContext(context).build();
Random rand = ThreadLocalRandom.current();
final int rowLen = 32;
for (int i = 0; i < NUM_KV; ++i) {