This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 2aa8e2a0e78 HBASE-29288 Avoid adding new blocks during prefetch if
usage is greater than accept factor (#6965) (#6985)
2aa8e2a0e78 is described below
commit 2aa8e2a0e788d8ed0fcecad0572edab251beba08
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Thu May 15 11:24:18 2025 +0100
HBASE-29288 Avoid adding new blocks during prefetch if usage is greater
than accept factor (#6965) (#6985)
Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
---
.../hadoop/hbase/io/hfile/BlockCacheUtil.java | 1 +
.../hadoop/hbase/io/hfile/HFilePreadReader.java | 7 +-
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 44 +++++---
.../hadoop/hbase/io/hfile/TestPrefetchRSClose.java | 15 ++-
.../hbase/io/hfile/bucket/TestBucketCache.java | 8 +-
.../io/hfile/bucket/TestBucketWriterThread.java | 4 +-
.../{ => bucket}/TestPrefetchWithBucketCache.java | 121 ++++++++++++++++++---
.../hadoop/hbase/io/hfile/bucket/TestRAMCache.java | 2 +-
8 files changed, 163 insertions(+), 39 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
index 7324701efe5..2c1559b1147 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
@@ -285,6 +285,7 @@ public class BlockCacheUtil {
.withPrevBlockOffset(block.getPrevBlockOffset()).withByteBuff(buff)
.withFillHeader(FILL_HEADER).withOffset(block.getOffset()).withNextBlockOnDiskSize(-1)
.withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() +
numBytes)
+ .withNextBlockOnDiskSize(block.getNextBlockOnDiskSize())
.withHFileContext(cloneContext(block.getHFileContext()))
.withByteBuffAllocator(cacheConf.getByteBuffAllocator()).withShared(!buff.hasArray()).build();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 6266be6cbcc..533021a428d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -97,6 +97,9 @@ public class HFilePreadReader extends HFileReaderImpl {
+ "Skipping prefetch, the block is already cached.", size,
cacheKey);
blockCount++;
dataBlockCount++;
+ // We need to reset this here, because we don't know the
size of next block, since
+ // we never recovered the current block.
+ onDiskSizeOfNextBlock = -1;
continue;
} else {
LOG.debug("Found block for cache key {}, but couldn't get
its size. "
@@ -150,8 +153,8 @@ public class HFilePreadReader extends HFileReaderImpl {
}
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation,
etc.)
- if (LOG.isTraceEnabled()) {
- LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end),
e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Prefetch " + getPathOffsetEndStr(path, offset, end),
e);
}
} catch (Throwable e) {
// Other exceptions are interesting
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index faf8313b7a2..8af41da5b23 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -287,8 +287,8 @@ public class BucketCache implements BlockCache, HeapSize {
"hbase.bucketcache.persistent.file.integrity.check.algorithm";
private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
- public static final String QUEUE_ADDITION_WAIT_TIME =
"hbase.bucketcache.queue.addition.waittime";
- static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
+ static final String QUEUE_ADDITION_WAIT_TIME =
"hbase.bucketcache.queue.addition.waittime";
+ private static final long DEFAULT_QUEUE_ADDITION_WAIT_TIME = 0;
private long queueAdditionWaitTime;
/**
* Use {@link java.security.MessageDigest} class's encryption algorithms to
check persistent file
@@ -586,7 +586,7 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
// Stuff the entry into the RAM cache so it can get drained to the
persistent store
RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem,
accessCount.incrementAndGet(),
- inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine);
+ inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine, wait);
/**
* Don't use ramCache.put(cacheKey, re) here. because there may be a
existing entry with same
* key in ramCache, the heap size of bucket cache need to update if
replacing entry from
@@ -1390,8 +1390,8 @@ public class BucketCache implements BlockCache, HeapSize {
// transferred with our current IOEngines. Should take care, when we
have new kinds of
// IOEngine in the future.
metaBuff.clear();
- BucketEntry bucketEntry =
- re.writeToCache(ioEngine, bucketAllocator, realCacheSize,
this::createRecycler, metaBuff);
+ BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator,
realCacheSize,
+ this::createRecycler, metaBuff, acceptableSize());
// Successfully added. Up index and add bucketEntry. Clear io
exceptions.
bucketEntries[index] = bucketEntry;
if (ioErrorStartTime > 0) {
@@ -1412,8 +1412,11 @@ public class BucketCache implements BlockCache, HeapSize
{
index++;
} catch (CacheFullException cfe) {
// Cache full when we tried to add. Try freeing space and then
retrying (don't up index)
- if (!freeInProgress) {
+ if (!freeInProgress && !re.isPrefetch()) {
freeSpace("Full!");
+ } else if (re.isPrefetch()) {
+ bucketEntries[index] = null;
+ index++;
} else {
Thread.sleep(50);
}
@@ -1467,13 +1470,13 @@ public class BucketCache implements BlockCache,
HeapSize {
return null;
});
}
+ long used = bucketAllocator.getUsedSize();
+ if (!entries.get(i).isPrefetch() && used > acceptableSize()) {
+ LOG.debug("Calling freeSpace for block: {}", entries.get(i).getKey());
+ freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
+ }
}
- long used = bucketAllocator.getUsedSize();
- if (used > acceptableSize()) {
- freeSpace("Used=" + used + " > acceptable=" + acceptableSize());
- }
- return;
}
/**
@@ -1955,13 +1958,16 @@ public class BucketCache implements BlockCache,
HeapSize {
private boolean inMemory;
private boolean isCachePersistent;
+ private boolean isPrefetch;
+
RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
boolean inMemory,
- boolean isCachePersistent) {
+ boolean isCachePersistent, boolean isPrefetch) {
this.key = bck;
this.data = data;
this.accessCounter = accessCounter;
this.inMemory = inMemory;
this.isCachePersistent = isCachePersistent;
+ this.isPrefetch = isPrefetch;
}
public Cacheable getData() {
@@ -1972,6 +1978,10 @@ public class BucketCache implements BlockCache, HeapSize
{
return key;
}
+ public boolean isPrefetch() {
+ return isPrefetch;
+ }
+
public void access(long accessCounter) {
this.accessCounter = accessCounter;
}
@@ -1985,7 +1995,7 @@ public class BucketCache implements BlockCache, HeapSize {
public BucketEntry writeToCache(final IOEngine ioEngine, final
BucketAllocator alloc,
final LongAdder realCacheSize, Function<BucketEntry, Recycler>
createRecycler,
- ByteBuffer metaBuff) throws IOException {
+ ByteBuffer metaBuff, final Long acceptableSize) throws IOException {
int len = data.getSerializedLength();
// This cacheable thing can't be serialized
if (len == 0) {
@@ -1996,6 +2006,14 @@ public class BucketCache implements BlockCache, HeapSize
{
// recovery
}
long offset = alloc.allocateBlock(len);
+ // In the case of prefetch, we want to avoid freeSpace runs when the
cache is full.
+ // this makes the cache allocation more predictable, and is particularly
important
+ // when persistent cache is enabled, as it won't trigger evictions of
the recovered blocks,
+ // which are likely the most accessed and relevant blocks in the cache.
+ if (isPrefetch() && alloc.getUsedSize() > acceptableSize) {
+ alloc.freeBlock(offset, len);
+ return null;
+ }
boolean succ = false;
BucketEntry bucketEntry = null;
try {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
index 7ca5e34e6db..8f32127443e 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -30,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
@@ -113,11 +116,17 @@ public class TestPrefetchRSClose {
// Default interval for cache persistence is 1000ms. So after 1000ms, both
the persistence files
// should exist.
-
HRegionServer regionServingRS = cluster.getRegionServer(0);
-
Admin admin = TEST_UTIL.getAdmin();
- List<String> cachedFilesList =
admin.getCachedFilesList(regionServingRS.getServerName());
+ List<String> cachedFilesList = new ArrayList<>();
+ Waiter.waitFor(conf, 5000, () -> {
+ try {
+
cachedFilesList.addAll(admin.getCachedFilesList(regionServingRS.getServerName()));
+ } catch (IOException e) {
+ // let the test try again
+ }
+ return cachedFilesList.size() > 0;
+ });
assertEquals(1, cachedFilesList.size());
for (HStoreFile h :
regionServingRS.getRegions().get(0).getStores().get(0).getStorefiles()) {
assertTrue(cachedFilesList.contains(h.getPath().getName()));
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 10d0cf6dd3e..156f6aaae60 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -748,8 +748,8 @@ public class TestBucketCache {
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
- RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false);
- RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false);
+ RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false, false);
+ RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false, false);
assertFalse(cache.containsKey(key1));
assertNull(cache.putIfAbsent(key1, re1));
@@ -796,12 +796,12 @@ public class TestBucketCache {
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
- RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false);
+ RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false, false);
Assert.assertEquals(0, allocator.getUsedSize());
try {
re.writeToCache(ioEngine, allocator, null, null,
- ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE));
+ ByteBuffer.allocate(HFileBlock.BLOCK_METADATA_SPACE), Long.MAX_VALUE);
Assert.fail();
} catch (Exception e) {
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index facbe7c50d1..7fc2b1355ce 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -138,7 +138,7 @@ public class TestBucketWriterThread {
RAMQueueEntry rqe = q.remove();
RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new
IOException("Mocked!")).when(spiedRqe).writeToCache(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
assertTrue(bc.blocksByHFile.isEmpty());
@@ -158,7 +158,7 @@ public class TestBucketWriterThread {
final CacheFullException cfe = new CacheFullException(0, 0);
BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class);
Mockito.doThrow(cfe).doReturn(mockedBucketEntry).when(spiedRqe).writeToCache(Mockito.any(),
- Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+ Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.any());
this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q);
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchWithBucketCache.java
similarity index 75%
rename from
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
rename to
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchWithBucketCache.java
index 82d0f99356b..db152b5f2c2 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchWithBucketCache.java
@@ -15,12 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hbase.io.hfile;
+package org.apache.hadoop.hbase.io.hfile.bucket;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import static
org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
+import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.QUEUE_ADDITION_WAIT_TIME;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -30,6 +32,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
@@ -48,8 +51,19 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
-import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
-import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
+import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
@@ -202,20 +216,21 @@ public class TestPrefetchWithBucketCache {
conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
- conf.setDouble("hbase.bucketcache.minfactor", 0.95);
- conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
+ conf.setDouble("hbase.bucketcache.minfactor", 0.98);
+ conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
+ conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
blockCache = BlockCacheFactory.createBlockCache(conf);
cacheConf = new CacheConfig(conf, blockCache);
Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
// Prefetches the file blocks
LOG.debug("First read should prefetch the blocks.");
createReaderAndWaitForPrefetchInterruption(storeFile);
+ Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
+ () -> PrefetchExecutor.isCompleted(storeFile));
BucketCache bc =
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
- long evictionsFirstPrefetch = bc.getStats().getEvictionCount();
- LOG.debug("evictions after first prefetch: {}",
bc.getStats().getEvictionCount());
+ long evictedFirstPrefetch = bc.getStats().getEvictedCount();
HFile.Reader reader =
createReaderAndWaitForPrefetchInterruption(storeFile);
- LOG.debug("evictions after second prefetch: {}",
bc.getStats().getEvictionCount());
- assertTrue((bc.getStats().getEvictionCount() - evictionsFirstPrefetch) <
10);
+ assertEquals(evictedFirstPrefetch, bc.getStats().getEvictedCount());
HFileScanner scanner = reader.getScanner(conf, true, true);
scanner.seekTo();
while (scanner.next()) {
@@ -223,10 +238,17 @@ public class TestPrefetchWithBucketCache {
LOG.trace("Iterating the full scan to evict some blocks");
}
scanner.close();
- LOG.debug("evictions after scanner: {}", bc.getStats().getEvictionCount());
+ Waiter.waitFor(conf, 5000, () -> {
+ for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
+ if (!queue.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ });
// The scanner should had triggered at least 3x evictions from the
prefetch,
// as we try cache each block without interruption.
- assertTrue(bc.getStats().getEvictionCount() > evictionsFirstPrefetch);
+ assertTrue(bc.getStats().getEvictedCount() > evictedFirstPrefetch);
}
@Test
@@ -234,8 +256,8 @@ public class TestPrefetchWithBucketCache {
conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
- conf.setDouble("hbase.bucketcache.minfactor", 0.95);
- conf.setDouble("hbase.bucketcache.extrafreefactor", 0.01);
+ conf.setDouble("hbase.bucketcache.minfactor", 0.98);
+ conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
blockCache = BlockCacheFactory.createBlockCache(conf);
ColumnFamilyDescriptor family =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("f")).setInMemory(true).build();
@@ -245,7 +267,78 @@ public class TestPrefetchWithBucketCache {
LOG.debug("First read should prefetch the blocks.");
createReaderAndWaitForPrefetchInterruption(storeFile);
BucketCache bc =
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
- assertTrue(bc.getStats().getEvictedCount() > 200);
+ Waiter.waitFor(conf, 1000, () -> PrefetchExecutor.isCompleted(storeFile));
+ long evictions = bc.getStats().getEvictedCount();
+ LOG.debug("Total evicted at this point: {}", evictions);
+ // creates another reader, now that cache is full, no block would fit and
prefetch should not
+ // trigger any new evictions
+ createReaderAndWaitForPrefetchInterruption(storeFile);
+ assertEquals(evictions, bc.getStats().getEvictedCount());
+ }
+
+ @Test
+ public void testPrefetchRunNoEvictions() throws Exception {
+ conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
+ conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
+ conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
+ conf.setDouble("hbase.bucketcache.minfactor", 0.98);
+ conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
+ conf.setLong(QUEUE_ADDITION_WAIT_TIME, 100);
+ blockCache = BlockCacheFactory.createBlockCache(conf);
+ cacheConf = new CacheConfig(conf, blockCache);
+ Path storeFile = writeStoreFile("testPrefetchRunNoEvictions", 10000);
+ // Prefetches the file blocks
+ createReaderAndWaitForPrefetchInterruption(storeFile);
+ Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
+ () -> PrefetchExecutor.isCompleted(storeFile));
+ BucketCache bc =
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
+ // Wait until all cache writer queues are empty
+ Waiter.waitFor(conf, 5000, () -> {
+ for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
+ if (!queue.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ });
+ // With the wait time configuration, prefetch should trigger no evictions
once it reaches
+ // cache capacity
+ assertEquals(0, bc.getStats().getEvictedCount());
+ }
+
+ @Test
+ public void testPrefetchRunTriggersEvictions() throws Exception {
+ conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
+ conf.set(BUCKET_CACHE_BUCKETS_KEY, "3072");
+ conf.setDouble("hbase.bucketcache.acceptfactor", 0.98);
+ conf.setDouble("hbase.bucketcache.minfactor", 0.98);
+ conf.setDouble("hbase.bucketcache.extrafreefactor", 0.0);
+ conf.setLong(QUEUE_ADDITION_WAIT_TIME, 0);
+ blockCache = BlockCacheFactory.createBlockCache(conf);
+ cacheConf = new CacheConfig(conf, blockCache);
+ Path storeFile = writeStoreFile("testPrefetchInterruptOnCapacity", 10000);
+ // Prefetches the file blocks
+ createReaderAndWaitForPrefetchInterruption(storeFile);
+ Waiter.waitFor(conf, (PrefetchExecutor.getPrefetchDelay() + 1000),
+ () -> PrefetchExecutor.isCompleted(storeFile));
+ BucketCache bc =
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
+ // Wait until all cache writer queues are empty
+ Waiter.waitFor(conf, 5000, () -> {
+ for (BlockingQueue<BucketCache.RAMQueueEntry> queue : bc.writerQueues) {
+ if (!queue.isEmpty()) {
+ return false;
+ }
+ }
+ return true;
+ });
+ if (bc.getStats().getFailedInserts() == 0) {
+ // With no wait time configuration, prefetch should trigger evictions
once it reaches
+ // cache capacity
+ assertNotEquals(0, bc.getStats().getEvictedCount());
+ } else {
+ LOG.info("We had {} cache insert failures, which may cause cache usage "
+ + "to never reach capacity.", bc.getStats().getFailedInserts());
+ }
}
@Test
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
index 58d9385f57e..092638d8997 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
@@ -90,7 +90,7 @@ public class TestRAMCache {
MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
- RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false);
+ RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false, false);
Assert.assertNull(cache.putIfAbsent(key, re));
Assert.assertEquals(cache.putIfAbsent(key, re), re);