This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 632a8c882c9 HBASE-29288 Avoid adding new blocks during prefetch if 
usage is greater than accept factor (#6965)
632a8c882c9 is described below

commit 632a8c882c96b252cb76f53b6b0a4631fdf9301f
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Mon May 12 10:43:33 2025 +0100

    HBASE-29288 Avoid adding new blocks during prefetch if usage is greater 
than accept factor (#6965)
    
    Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
---
 .../hadoop/hbase/io/hfile/BlockCacheUtil.java      |   1 +
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |  44 +++++---
 .../hadoop/hbase/io/hfile/TestPrefetchRSClose.java |  15 ++-
 .../hbase/io/hfile/bucket/TestBucketCache.java     |   8 +-
 .../io/hfile/bucket/TestBucketWriterThread.java    |   4 +-
 .../{ => bucket}/TestPrefetchWithBucketCache.java  | 116 ++++++++++++++++++---
 .../hadoop/hbase/io/hfile/bucket/TestRAMCache.java |   2 +-
 7 files changed, 153 insertions(+), 37 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
index 7324701efe5..2c1559b1147 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
@@ -285,6 +285,7 @@ public class BlockCacheUtil {
       .withPrevBlockOffset(block.getPrevBlockOffset()).withByteBuff(buff)
       
.withFillHeader(FILL_HEADER).withOffset(block.getOffset()).withNextBlockOnDiskSize(-1)
       .withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() + 
numBytes)
+      .withNextBlockOnDiskSize(block.getNextBlockOnDiskSize())
       .withHFileContext(cloneContext(block.getHFileContext()))
       
.withByteBuffAllocator(cacheConf.getByteBuffAllocator()).withShared(!buff.hasArray()).build();
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index a52c00c83da..7802d868bea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -292,8 +292,8 @@ public class BucketCache implements BlockCache, HeapSize {
     "hbase.bucketcache.persistent.file.integrity.check.algorithm";
   private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
 
-  public static final String QUEUE_ADDITION_WAIT_TIME = 
"hbase.bucketcache.queue.addition.waittime";
-  static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
+  static final String QUEUE_ADDITION_WAIT_TIME = 
"hbase.bucketcache.queue.addition.waittime";
+  private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
   private long queueAdditionWaitTime;
   /**
    * Use {@link java.security.MessageDigest} class's encryption algorithms to 
check persistent file
@@ -599,7 +599,7 @@ public class BucketCache implements BlockCache, HeapSize {
     LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
     // Stuff the entry into the RAM cache so it can get drained to the 
persistent store
     RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem, 
accessCount.incrementAndGet(),
-      inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine);
+      inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine, wait);
     /**
      * Don't use ramCache.put(cacheKey, re) here. because there may be a 
existing entry with same
      * key in ramCache, the heap size of bucket cache need to update if 
replacing entry from
@@ -1403,8 +1403,8 @@ public class BucketCache implements BlockCache, HeapSize {
         // transferred with our current IOEngines. Should take care, when we 
have new kinds of
         // IOEngine in the future.
         metaBuff.clear();
-        BucketEntry bucketEntry =
-          re.writeToCache(ioEngine, bucketAllocator, realCacheSize, 
this::createRecycler, metaBuff);
+        BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, 
realCacheSize,
+          this::createRecycler, metaBuff, acceptableSize());
         // Successfully added. Up index and add bucketEntry. Clear io 
exceptions.
         bucketEntries[index] = bucketEntry;
         if (ioErrorStartTime > 0) {
@@ -1425,8 +1425,11 @@ public class BucketCache implements BlockCache, HeapSize 
{
         index++;
       } catch (CacheFullException cfe) {
         // Cache full when we tried to add. Try freeing space and then 
retrying (don't up index)
-        if (!freeInProgress) {
+        if (!freeInProgress && !re.isPrefetch()) {
           freeSpace("Full!");
+        } else if (re.isPrefetch()) {
+          bucketEntries[index] = null;
+          index++;
         } else {
           Thread.sleep(50);
         }
@@ -1480,13 +1483,13 @@ public class BucketCache implements BlockCache, 
HeapSize {
           return null;
         });
       }
+      long used = bucketAllocator.getUsedSize();
+      if (!entries.get(i).isPrefetch() && used > acceptableSize()) {
+        LOG.debug("Calling freeSpace for block: {}", entries.get(i).getKey());
+        freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
+      }
     }
 
-    long used = bucketAllocator.getUsedSize();
-    if (used > acceptableSize()) {
-      freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
-    }
-    return;
   }
 
   /**
@@ -1967,13 +1970,16 @@ public class BucketCache implements BlockCache, 
HeapSize {
     private boolean inMemory;
     private boolean isCachePersistent;
 
+    private boolean isPrefetch;
+
     RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter, 
boolean inMemory,
-      boolean isCachePersistent) {
+      boolean isCachePersistent, boolean isPrefetch) {
       this.key = bck;
       this.data = data;
       this.accessCounter = accessCounter;
       this.inMemory = inMemory;
       this.isCachePersistent = isCachePersistent;
+      this.isPrefetch = isPrefetch;
     }
 
     public Cacheable getData() {
@@ -1984,6 +1990,10 @@ public class BucketCache implements BlockCache, HeapSize 
{
       return key;
     }
 
+    public boolean isPrefetch() {
+      return isPrefetch;
+    }
+
     public void access(long accessCounter) {
       this.accessCounter = accessCounter;
     }
@@ -1997,7 +2007,7 @@ public class BucketCache implements BlockCache, HeapSize {
 
     public BucketEntry writeToCache(final IOEngine ioEngine, final 
BucketAllocator alloc,
       final LongAdder realCacheSize, Function<BucketEntry, Recycler> 
createRecycler,
-      ByteBuffer metaBuff) throws IOException {
+      ByteBuffer metaBuff, final Long acceptableSize) throws IOException {
       int len = data.getSerializedLength();
       // This cacheable thing can't be serialized
       if (len == 0) {
@@ -2008,6 +2018,14 @@ public class BucketCache implements BlockCache, HeapSize 
{
                            // recovery
       }
       long offset = alloc.allocateBlock(len);
+      // In the case of prefetch, we want to avoid freeSpace runs when the 
cache is full.
+      // this makes the cache allocation more predictable, and is particularly 
important
+      // when persistent cache is enabled, as it won't trigger evictions of 
the recovered blocks,
+      // which are likely the most accessed and relevant blocks in the cache.
+      if (isPrefetch() && alloc.getUsedSize() > acceptableSize) {
+        alloc.freeBlock(offset, len);
+        return null;
+      }
       boolean succ = false;
       BucketEntry bucketEntry = null;
       try {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
index a5023d5da00..2d9dd8b6c6c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -30,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
 import org.apache.hadoop.hbase.StartTestingClusterOption;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Put;
@@ -114,11 +117,17 @@ public class TestPrefetchRSClose {
 
     // Default interval for cache persistence is 1000ms. So after 1000ms, both 
the persistence files
     // should exist.
-
     HRegionServer regionServingRS = cluster.getRegionServer(0);
-
     Admin admin = TEST_UTIL.getAdmin();
-    List<String> cachedFilesList = 
admin.getCachedFilesList(regionServingRS.getServerName());
+    List<String> cachedFilesList = new ArrayList<>();
+    Waiter.waitFor(conf, 5000, () -> {
+      try {
+        
cachedFilesList.addAll(admin.getCachedFilesList(regionServingRS.getServerName()));
+      } catch (IOException e) {
+        // let the test try again
+      }
+      return cachedFilesList.size() > 0;
+    });
     assertEquals(1, cachedFilesList.size());
     for (HStoreFile h : 
regionServingRS.getRegions().get(0).getStores().get(0).getStorefiles()) {
       assertTrue(cachedFilesList.contains(h.getPath().getName()));
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index b1c43375510..9a79ad3fc24 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -748,8 +748,8 @@ public class TestBucketCache {
       HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
     HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1, 
ByteBuff.wrap(buf),
       HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
-    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false);
-    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false);
+    RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false);
+    RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false);
 
     assertFalse(cache.containsKey(key1));
     assertNull(cache.putIfAbsent(key1, re1));
@@ -796,12 +796,12 @@ public class TestBucketCache {
     BucketAllocator allocator = new BucketAllocator(availableSpace, null);
 
     BlockCacheKey key = new BlockCacheKey("dummy", 1L);
-    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false);
+    RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false);
 
     Assert.assertEquals(0, allocator.getUsedSize());
     try {
       re.writeToCache(ioEngine, allocator, null, null,
-        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
+        ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE);
       Assert.fail();
     } catch (Exception e) {
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index facbe7c50d1..7fc2b1355ce 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -138,7 +138,7 @@ public class TestBucketWriterThread {
     RAMQueueEntry rqe = q.remove();
     RAMQueueEntry spiedRqe = Mockito.spy(rqe);
     Mockito.doThrow(new 
IOException("Mocked!")).when(spiedRqe).writeToCache(Mockito.any(),
-      Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+      Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
     this.q.add(spiedRqe);
     doDrainOfOneEntry(bc, wt, q);
     assertTrue(bc.blocksByHFile.isEmpty());
@@ -158,7 +158,7 @@ public class TestBucketWriterThread {
     final CacheFullException cfe = new CacheFullException(0, 0);
     BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
     
Mockito.doThrow(cfe).doReturn(mockedBucketEntry).when(spiedRqe).writeToCache(Mockito.any(),
-      Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+      Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), 
Mockito.any());
     this.q.add(spiedRqe);
     doDrainOfOneEntry(bc, wt, q);
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchWithBucketCache.java
similarity index 76%
rename from 
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
rename to 
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchWithBucketCache.java
index 8ed49b3b796..6fdb56bfc6c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchWithBucketCache.java
@@ -15,12 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hbase.io.hfile;
+package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
 import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
 import static 
org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
+import static 
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -30,6 +32,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
@@ -48,8 +51,19 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
-import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
-import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
+import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
@@ -202,20 +216,21 @@ public class TestPrefetchWithBucketCache {
     conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
     conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
     conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
-    conf.setDouble("hbase.bucketcache.minfactor", 0.95);
-    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
+    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
+    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
+    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
     blockCache = BlockCacheFactory.createBlockCache(conf);
     cacheConf = new CacheConfig(conf, blockCache);
     Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
     // Prefetches the file blocks
     LOG.debug("First read should prefetch the blocks.");
     createReaderAndWaitForPrefetchInterruption(storeFile);
+    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
+      () -> PrefetchExecutor.isCompleted(storeFile));
     BucketCache bc = 
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
-    long evictionsFirstPrefetch = bc.getStats().getEvictionCount();
-    LOG.debug("evictions after first prefetch: {}", 
bc.getStats().getEvictionCount());
+    long evictedFirstPrefetch = bc.getStats().getEvictedCount();
     HFile.Reader reader = 
createReaderAndWaitForPrefetchInterruption(storeFile);
-    LOG.debug("evictions after second prefetch: {}", 
bc.getStats().getEvictionCount());
-    assertTrue((bc.getStats().getEvictionCount() - evictionsFirstPrefetch) < 
10);
+    assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount());
     HFileScanner scanner = reader.getScanner(conf, true, true);
     scanner.seekTo();
     while (scanner.next()) {
@@ -223,10 +238,17 @@ public class TestPrefetchWithBucketCache {
       LOG.trace("Iterating the full scan to evict some blocks");
     }
     scanner.close();
-    LOG.debug("evictions after scanner: {}", bc.getStats().getEvictionCount());
+    Waiter.waitFor(conf, 5000, () -> {
+      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
+        if (!queue.isEmpty()) {
+          return false;
+        }
+      }
+      return true;
+    });
     // The scanner should had triggered at least 3x evictions from the 
prefetch,
     // as we try cache each block without interruption.
-    assertTrue(bc.getStats().getEvictionCount() > evictionsFirstPrefetch);
+    assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch);
   }
 
   @Test
@@ -234,8 +256,8 @@ public class TestPrefetchWithBucketCache {
     conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
     conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
     conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
-    conf.setDouble("hbase.bucketcache.minfactor", 0.95);
-    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
+    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
+    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
     blockCache = BlockCacheFactory.createBlockCache(conf);
     ColumnFamilyDescriptor family =
       
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build();
@@ -245,7 +267,73 @@ public class TestPrefetchWithBucketCache {
     LOG.debug("First read should prefetch the blocks.");
     createReaderAndWaitForPrefetchInterruption(storeFile);
     BucketCache bc = 
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
-    assertTrue(bc.getStats().getEvictedCount() > 200);
+    Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile));
+    long evictions = bc.getStats().getEvictedCount();
+    LOG.debug("Total evicted at this point: {}", evictions);
+    // creates another reader, now that cache is full, no block would fit and 
prefetch should not
+    // trigger any new evictions
+    createReaderAndWaitForPrefetchInterruption(storeFile);
+    assertEquals(evictions, bc.getStats().getEvictedCount());
+  }
+
+  @Test
+  public void testPrefetchRunNoEvictions() throws Exception {
+    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
+    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
+    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
+    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
+    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
+    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
+    blockCache = BlockCacheFactory.createBlockCache(conf);
+    cacheConf = new CacheConfig(conf, blockCache);
+    Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000);
+    // Prefetches the file blocks
+    createReaderAndWaitForPrefetchInterruption(storeFile);
+    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
+      () -> PrefetchExecutor.isCompleted(storeFile));
+    BucketCache bc = 
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
+    // Wait until all cache writer queues are empty
+    Waiter.waitFor(conf, 5000, () -> {
+      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
+        if (!queue.isEmpty()) {
+          return false;
+        }
+      }
+      return true;
+    });
+    // With the wait time configuration, prefetch should trigger no evictions 
once it reaches
+    // cache capacity
+    assertEquals(0, bc.getStats().getEvictedCount());
+  }
+
+  @Test
+  public void testPrefetchRunTriggersEvictions() throws Exception {
+    conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
+    conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
+    conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
+    conf.setDouble("hbase.bucketcache.minfactor", 0.98);
+    conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
+    conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0);
+    blockCache = BlockCacheFactory.createBlockCache(conf);
+    cacheConf = new CacheConfig(conf, blockCache);
+    Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
+    // Prefetches the file blocks
+    createReaderAndWaitForPrefetchInterruption(storeFile);
+    Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
+      () -> PrefetchExecutor.isCompleted(storeFile));
+    BucketCache bc = 
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
+    // Wait until all cache writer queues are empty
+    Waiter.waitFor(conf, 5000, () -> {
+      for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
+        if (!queue.isEmpty()) {
+          return false;
+        }
+      }
+      return true;
+    });
+    // With the wait time configuration, prefetch should trigger no evictions 
once it reaches
+    // cache capacity
+    assertNotEquals(0, bc.getStats().getEvictedCount());
   }
 
   @Test
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
index 58d9385f57e..092638d8997 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
@@ -90,7 +90,7 @@ public class TestRAMCache {
     MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
       ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
       new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
-    RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false);
+    RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false, false);
 
     Assert.assertNull(cache.putIfAbsent(key, re));
     Assert.assertEquals(cache.putIfAbsent(key, re), re);

Reply via email to