This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 73cb0dddf8b HBASE-28303 Interrupt cache prefetch thread when a heap
usage threshold is reached (#5615)
73cb0dddf8b is described below
commit 73cb0dddf8bcd0768e2e8eb7cf9d1bc3eddc1ea1
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Fri Jan 26 10:03:42 2024 +0000
HBASE-28303 Interrupt cache prefetch thread when a heap usage threshold is
reached (#5615)
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
Signed-off-by: Peter Somogyi <[email protected]>
---
.../apache/hadoop/hbase/io/hfile/CacheConfig.java | 28 ++++++++++++++++
.../hadoop/hbase/io/hfile/HFilePreadReader.java | 24 ++++++++++----
.../apache/hadoop/hbase/io/hfile/TestPrefetch.java | 38 ++++++++++++++++++++++
.../io/hfile/bucket/TestBucketCachePersister.java | 16 +++++----
4 files changed, 92 insertions(+), 14 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 4587eced616..f89a6194cef 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -99,6 +99,12 @@ public class CacheConfig {
public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY =
"hbase.bucketcache.persist.intervalinmillis";
+ /**
+ * Configuration key to set the heap usage threshold limit once prefetch
threads should be
+ * interrupted.
+ */
+ public static final String PREFETCH_HEAP_USAGE_THRESHOLD =
"hbase.rs.prefetchheapusage";
+
// Defaults
public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
@@ -111,6 +117,7 @@ public class CacheConfig {
public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD =
Long.MAX_VALUE;
+ public static final double DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD = 1d;
/**
* Whether blocks should be cached on read (default is on if there is a
cache but this can be
@@ -157,6 +164,8 @@ public class CacheConfig {
private final ByteBuffAllocator byteBuffAllocator;
+ private final double heapUsageThreshold;
+
/**
* Create a cache configuration using the specified configuration object and
defaults for family
* level settings. Only use if no column family context.
@@ -201,6 +210,8 @@ public class CacheConfig {
this.cacheCompactedDataOnWrite =
conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY,
DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE);
this.cacheCompactedDataOnWriteThreshold =
getCacheCompactedBlocksOnWriteThreshold(conf);
+ this.heapUsageThreshold =
+ conf.getDouble(PREFETCH_HEAP_USAGE_THRESHOLD,
DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD);
this.blockCache = blockCache;
this.byteBuffAllocator = byteBuffAllocator;
}
@@ -222,6 +233,7 @@ public class CacheConfig {
this.dropBehindCompaction = cacheConf.dropBehindCompaction;
this.blockCache = cacheConf.blockCache;
this.byteBuffAllocator = cacheConf.byteBuffAllocator;
+ this.heapUsageThreshold = cacheConf.heapUsageThreshold;
}
private CacheConfig() {
@@ -237,6 +249,7 @@ public class CacheConfig {
this.dropBehindCompaction = false;
this.blockCache = null;
this.byteBuffAllocator = ByteBuffAllocator.HEAP;
+ this.heapUsageThreshold = DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD;
}
/**
@@ -386,6 +399,17 @@ public class CacheConfig {
return false;
}
+ /**
+ * Checks if the current heap usage is below the threshold configured by
+ * "hbase.rs.prefetchheapusage" (0.8 by default).
+ */
+ public boolean isHeapUsageBelowThreshold() {
+ double total = Runtime.getRuntime().maxMemory();
+ double available = Runtime.getRuntime().freeMemory();
+ double usedRatio = 1d - (available / total);
+ return heapUsageThreshold > usedRatio;
+ }
+
/**
* If we make sure the block could not be cached, we will not acquire the
lock otherwise we will
* acquire lock
@@ -413,6 +437,10 @@ public class CacheConfig {
return this.byteBuffAllocator;
}
+ public double getHeapUsageThreshold() {
+ return heapUsageThreshold;
+ }
+
private long getCacheCompactedBlocksOnWriteThreshold(Configuration conf) {
long cacheCompactedBlocksOnWriteThreshold =
conf.getLong(CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 92f6a8169f3..6063ffe6889 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -106,13 +106,23 @@ public class HFilePreadReader extends HFileReaderImpl {
HFileBlock block = prefetchStreamReader.readBlock(offset,
onDiskSizeOfNextBlock,
/* cacheBlock= */true, /* pread= */false, false, false, null,
null, true);
try {
- if (!cacheConf.isInMemory() &&
!cache.blockFitsIntoTheCache(block).orElse(true)) {
- LOG.warn(
- "Interrupting prefetch for file {} because block {} of
size {} "
- + "doesn't fit in the available cache space.",
- path, cacheKey, block.getOnDiskSizeWithHeader());
- interrupted = true;
- break;
+ if (!cacheConf.isInMemory()) {
+ if (!cache.blockFitsIntoTheCache(block).orElse(true)) {
+ LOG.warn(
+ "Interrupting prefetch for file {} because block {} of
size {} "
+ + "doesn't fit in the available cache space.",
+ path, cacheKey, block.getOnDiskSizeWithHeader());
+ interrupted = true;
+ break;
+ }
+ if (!cacheConf.isHeapUsageBelowThreshold()) {
+ LOG.warn(
+ "Interrupting prefetch because heap usage is above the
threshold: {} "
+ + "configured via {}",
+ cacheConf.getHeapUsageThreshold(),
CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD);
+ interrupted = true;
+ break;
+ }
}
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 0b45a930dce..85b9199638c 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -156,6 +157,43 @@ public class TestPrefetch {
poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size());
}
+ @Test
+ public void testPrefetchHeapUsageAboveThreshold() throws Exception {
+ ColumnFamilyDescriptor columnFamilyDescriptor =
+
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
+ .setBlockCacheEnabled(true).build();
+ HFileContext meta = new
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
+ Configuration newConf = new Configuration(conf);
+ newConf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, 0.1);
+ CacheConfig cacheConfig =
+ new CacheConfig(newConf, columnFamilyDescriptor, blockCache,
ByteBuffAllocator.HEAP);
+ Path storeFile = writeStoreFile("testPrefetchHeapUsageAboveThreshold",
meta, cacheConfig);
+ MutableInt cachedCount = new MutableInt(0);
+ MutableInt unCachedCount = new MutableInt(0);
+ readStoreFile(storeFile, (r, o) -> {
+ HFileBlock block = null;
+ try {
+ block = r.readBlock(o, -1, false, true, false, true, null, null);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ return block;
+ }, (key, block) -> {
+ boolean isCached = blockCache.getBlock(key, true, false, true) != null;
+ if (
+ block.getBlockType() == BlockType.DATA || block.getBlockType() ==
BlockType.ROOT_INDEX
+ || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
+ ) {
+ if (isCached) {
+ cachedCount.increment();
+ } else {
+ unCachedCount.increment();
+ }
+ }
+ }, cacheConfig);
+ assertTrue(unCachedCount.compareTo(cachedCount) > 0);
+ }
+
@Test
public void testPrefetch() throws Exception {
TraceUtil.trace(() -> {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
index f6d3efa9015..a39df7e1471 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
@@ -86,9 +86,10 @@ public class TestBucketCachePersister {
return conf;
}
- public BucketCache setupBucketCache(Configuration conf) throws IOException {
- BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+ public BucketCache setupBucketCache(Configuration conf, String
persistentCacheFile)
+ throws IOException {
+ BucketCache bucketCache = new BucketCache("file:" + testDir + "/" +
persistentCacheFile,
+ capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads,
writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
return bucketCache;
}
@@ -103,7 +104,7 @@ public class TestBucketCachePersister {
public void testPrefetchPersistenceCrash() throws Exception {
long bucketCachePersistInterval = 3000;
Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
- BucketCache bucketCache = setupBucketCache(conf);
+ BucketCache bucketCache = setupBucketCache(conf,
"testPrefetchPersistenceCrash");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
@@ -121,7 +122,7 @@ public class TestBucketCachePersister {
public void testPrefetchPersistenceCrashNegative() throws Exception {
long bucketCachePersistInterval = Long.MAX_VALUE;
Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
- BucketCache bucketCache = setupBucketCache(conf);
+ BucketCache bucketCache = setupBucketCache(conf,
"testPrefetchPersistenceCrashNegative");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Cache
@@ -134,7 +135,7 @@ public class TestBucketCachePersister {
@Test
public void testPrefetchListUponBlockEviction() throws Exception {
Configuration conf = setupBucketCacheConfig(200);
- BucketCache bucketCache = setupBucketCache(conf);
+ BucketCache bucketCache = setupBucketCache(conf,
"testPrefetchListUponBlockEviction");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Blocks in cache
@@ -156,7 +157,8 @@ public class TestBucketCachePersister {
@Test
public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception
{
Configuration conf = setupBucketCacheConfig(200);
- BucketCache bucketCache = setupBucketCache(conf);
+ BucketCache bucketCache =
+ setupBucketCache(conf, "testPrefetchBlockEvictionWhilePrefetchRunning");
CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
FileSystem fs = HFileSystem.get(conf);
// Load Blocks in cache