This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 73cb0dddf8b HBASE-28303 Interrupt cache prefetch thread when a heap 
usage threshold is reached (#5615)
73cb0dddf8b is described below

commit 73cb0dddf8bcd0768e2e8eb7cf9d1bc3eddc1ea1
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Fri Jan 26 10:03:42 2024 +0000

    HBASE-28303 Interrupt cache prefetch thread when a heap usage threshold is 
reached (#5615)
    
    Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
    Signed-off-by: Peter Somogyi <[email protected]>
---
 .../apache/hadoop/hbase/io/hfile/CacheConfig.java  | 28 ++++++++++++++++
 .../hadoop/hbase/io/hfile/HFilePreadReader.java    | 24 ++++++++++----
 .../apache/hadoop/hbase/io/hfile/TestPrefetch.java | 38 ++++++++++++++++++++++
 .../io/hfile/bucket/TestBucketCachePersister.java  | 16 +++++----
 4 files changed, 92 insertions(+), 14 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 4587eced616..f89a6194cef 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -99,6 +99,12 @@ public class CacheConfig {
   public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY =
     "hbase.bucketcache.persist.intervalinmillis";
 
+  /**
+   * Configuration key to set the heap usage threshold limit once prefetch 
threads should be
+   * interrupted.
+   */
+  public static final String PREFETCH_HEAP_USAGE_THRESHOLD = 
"hbase.rs.prefetchheapusage";
+
   // Defaults
   public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
   public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
@@ -111,6 +117,7 @@ public class CacheConfig {
   public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
   public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
   public static final long DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD = 
Long.MAX_VALUE;
+  public static final double DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD = 1d;
 
   /**
    * Whether blocks should be cached on read (default is on if there is a 
cache but this can be
@@ -157,6 +164,8 @@ public class CacheConfig {
 
   private final ByteBuffAllocator byteBuffAllocator;
 
+  private final double heapUsageThreshold;
+
   /**
    * Create a cache configuration using the specified configuration object and 
defaults for family
    * level settings. Only use if no column family context.
@@ -201,6 +210,8 @@ public class CacheConfig {
     this.cacheCompactedDataOnWrite =
       conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, 
DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE);
     this.cacheCompactedDataOnWriteThreshold = 
getCacheCompactedBlocksOnWriteThreshold(conf);
+    this.heapUsageThreshold =
+      conf.getDouble(PREFETCH_HEAP_USAGE_THRESHOLD, 
DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD);
     this.blockCache = blockCache;
     this.byteBuffAllocator = byteBuffAllocator;
   }
@@ -222,6 +233,7 @@ public class CacheConfig {
     this.dropBehindCompaction = cacheConf.dropBehindCompaction;
     this.blockCache = cacheConf.blockCache;
     this.byteBuffAllocator = cacheConf.byteBuffAllocator;
+    this.heapUsageThreshold = cacheConf.heapUsageThreshold;
   }
 
   private CacheConfig() {
@@ -237,6 +249,7 @@ public class CacheConfig {
     this.dropBehindCompaction = false;
     this.blockCache = null;
     this.byteBuffAllocator = ByteBuffAllocator.HEAP;
+    this.heapUsageThreshold = DEFAULT_PREFETCH_HEAP_USAGE_THRESHOLD;
   }
 
   /**
@@ -386,6 +399,17 @@ public class CacheConfig {
     return false;
   }
 
+  /**
+   * Checks if the current heap usage is below the threshold configured by
+   * "hbase.rs.prefetchheapusage" (0.8 by default).
+   */
+  public boolean isHeapUsageBelowThreshold() {
+    double total = Runtime.getRuntime().maxMemory();
+    double available = Runtime.getRuntime().freeMemory();
+    double usedRatio = 1d - (available / total);
+    return heapUsageThreshold > usedRatio;
+  }
+
   /**
    * If we make sure the block could not be cached, we will not acquire the 
lock otherwise we will
    * acquire lock
@@ -413,6 +437,10 @@ public class CacheConfig {
     return this.byteBuffAllocator;
   }
 
+  public double getHeapUsageThreshold() {
+    return heapUsageThreshold;
+  }
+
   private long getCacheCompactedBlocksOnWriteThreshold(Configuration conf) {
     long cacheCompactedBlocksOnWriteThreshold =
       conf.getLong(CACHE_COMPACTED_BLOCKS_ON_WRITE_THRESHOLD_KEY,
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 92f6a8169f3..6063ffe6889 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -106,13 +106,23 @@ public class HFilePreadReader extends HFileReaderImpl {
               HFileBlock block = prefetchStreamReader.readBlock(offset, 
onDiskSizeOfNextBlock,
                 /* cacheBlock= */true, /* pread= */false, false, false, null, 
null, true);
               try {
-                if (!cacheConf.isInMemory() && 
!cache.blockFitsIntoTheCache(block).orElse(true)) {
-                  LOG.warn(
-                    "Interrupting prefetch for file {} because block {} of 
size {} "
-                      + "doesn't fit in the available cache space.",
-                    path, cacheKey, block.getOnDiskSizeWithHeader());
-                  interrupted = true;
-                  break;
+                if (!cacheConf.isInMemory()) {
+                  if (!cache.blockFitsIntoTheCache(block).orElse(true)) {
+                    LOG.warn(
+                      "Interrupting prefetch for file {} because block {} of 
size {} "
+                        + "doesn't fit in the available cache space.",
+                      path, cacheKey, block.getOnDiskSizeWithHeader());
+                    interrupted = true;
+                    break;
+                  }
+                  if (!cacheConf.isHeapUsageBelowThreshold()) {
+                    LOG.warn(
+                      "Interrupting prefetch because heap usage is above the 
threshold: {} "
+                        + "configured via {}",
+                      cacheConf.getHeapUsageThreshold(), 
CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD);
+                    interrupted = true;
+                    break;
+                  }
                 }
                 onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
                 offset += block.getOnDiskSizeWithHeader();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 0b45a930dce..85b9199638c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -156,6 +157,43 @@ public class TestPrefetch {
       poolExecutor.getCompletedTaskCount() + poolExecutor.getQueue().size());
   }
 
+  @Test
+  public void testPrefetchHeapUsageAboveThreshold() throws Exception {
+    ColumnFamilyDescriptor columnFamilyDescriptor =
+      
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setPrefetchBlocksOnOpen(true)
+        .setBlockCacheEnabled(true).build();
+    HFileContext meta = new 
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
+    Configuration newConf = new Configuration(conf);
+    newConf.setDouble(CacheConfig.PREFETCH_HEAP_USAGE_THRESHOLD, 0.1);
+    CacheConfig cacheConfig =
+      new CacheConfig(newConf, columnFamilyDescriptor, blockCache, 
ByteBuffAllocator.HEAP);
+    Path storeFile = writeStoreFile("testPrefetchHeapUsageAboveThreshold", 
meta, cacheConfig);
+    MutableInt cachedCount = new MutableInt(0);
+    MutableInt unCachedCount = new MutableInt(0);
+    readStoreFile(storeFile, (r, o) -> {
+      HFileBlock block = null;
+      try {
+        block = r.readBlock(o, -1, false, true, false, true, null, null);
+      } catch (IOException e) {
+        fail(e.getMessage());
+      }
+      return block;
+    }, (key, block) -> {
+      boolean isCached = blockCache.getBlock(key, true, false, true) != null;
+      if (
+        block.getBlockType() == BlockType.DATA || block.getBlockType() == 
BlockType.ROOT_INDEX
+          || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
+      ) {
+        if (isCached) {
+          cachedCount.increment();
+        } else {
+          unCachedCount.increment();
+        }
+      }
+    }, cacheConfig);
+    assertTrue(unCachedCount.compareTo(cachedCount) > 0);
+  }
+
   @Test
   public void testPrefetch() throws Exception {
     TraceUtil.trace(() -> {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
index f6d3efa9015..a39df7e1471 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
@@ -86,9 +86,10 @@ public class TestBucketCachePersister {
     return conf;
   }
 
-  public BucketCache setupBucketCache(Configuration conf) throws IOException {
-    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
-      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+  public BucketCache setupBucketCache(Configuration conf, String 
persistentCacheFile)
+    throws IOException {
+    BucketCache bucketCache = new BucketCache("file:" + testDir + "/" + 
persistentCacheFile,
+      capacitySize, constructedBlockSize, constructedBlockSizes, writeThreads, 
writerQLen,
       testDir + "/bucket.persistence", 60 * 1000, conf);
     return bucketCache;
   }
@@ -103,7 +104,7 @@ public class TestBucketCachePersister {
   public void testPrefetchPersistenceCrash() throws Exception {
     long bucketCachePersistInterval = 3000;
     Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
-    BucketCache bucketCache = setupBucketCache(conf);
+    BucketCache bucketCache = setupBucketCache(conf, 
"testPrefetchPersistenceCrash");
     CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
     FileSystem fs = HFileSystem.get(conf);
     // Load Cache
@@ -121,7 +122,7 @@ public class TestBucketCachePersister {
   public void testPrefetchPersistenceCrashNegative() throws Exception {
     long bucketCachePersistInterval = Long.MAX_VALUE;
     Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
-    BucketCache bucketCache = setupBucketCache(conf);
+    BucketCache bucketCache = setupBucketCache(conf, 
"testPrefetchPersistenceCrashNegative");
     CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
     FileSystem fs = HFileSystem.get(conf);
     // Load Cache
@@ -134,7 +135,7 @@ public class TestBucketCachePersister {
   @Test
   public void testPrefetchListUponBlockEviction() throws Exception {
     Configuration conf = setupBucketCacheConfig(200);
-    BucketCache bucketCache = setupBucketCache(conf);
+    BucketCache bucketCache = setupBucketCache(conf, 
"testPrefetchListUponBlockEviction");
     CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
     FileSystem fs = HFileSystem.get(conf);
     // Load Blocks in cache
@@ -156,7 +157,8 @@ public class TestBucketCachePersister {
   @Test
   public void testPrefetchBlockEvictionWhilePrefetchRunning() throws Exception 
{
     Configuration conf = setupBucketCacheConfig(200);
-    BucketCache bucketCache = setupBucketCache(conf);
+    BucketCache bucketCache =
+      setupBucketCache(conf, "testPrefetchBlockEvictionWhilePrefetchRunning");
     CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
     FileSystem fs = HFileSystem.get(conf);
     // Load Blocks in cache

Reply via email to