This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new c8f512aa3a3 HBASE-29727 Introduce a String pool for repeating
filename, region and cf string fields in BlockCacheKey
c8f512aa3a3 is described below
commit c8f512aa3a31173b9d3fccc5868c9e1ae4cdfada
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Mon Dec 15 20:03:30 2025 +0000
HBASE-29727 Introduce a String pool for repeating filename, region and cf
string fields in BlockCacheKey
Change-Id: I1c66e0763888f20d91d8df0c483b9b5af30c6be0
---
.../apache/hadoop/hbase/io/hfile/BlockCache.java | 17 ++
.../hadoop/hbase/io/hfile/CombinedBlockCache.java | 11 ++
.../hadoop/hbase/io/hfile/HFilePreadReader.java | 20 ++-
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 179 +++++++++++++++++----
.../io/hfile/bucket/BucketCachePersister.java | 2 +
.../master/balancer/CacheAwareLoadBalancer.java | 12 +-
.../apache/hadoop/hbase/regionserver/HRegion.java | 6 +-
.../hfile/TestBlockEvictionOnRegionMovement.java | 1 +
.../apache/hadoop/hbase/io/hfile/TestPrefetch.java | 2 +-
.../hbase/io/hfile/bucket/TestBucketCache.java | 23 ++-
.../io/hfile/bucket/TestBucketWriterThread.java | 1 +
.../io/hfile/bucket/TestPrefetchPersistence.java | 2 +
.../bucket/TestRecoveryPersistentBucketCache.java | 67 +++++++-
.../io/hfile/bucket/TestVerifyBucketCacheFile.java | 53 +++---
14 files changed, 313 insertions(+), 83 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
index d3b7eb2057e..4ae63209ac3 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
@@ -284,4 +284,21 @@ public interface BlockCache extends Iterable<CachedBlock>,
ConfigurationObserver
default void onConfigurationChange(Configuration config) {
// noop
}
+
+ /**
+ * API to check whether or not, the cache is enabled.
+ * @return returns true if the cache is enabled, false otherwise.
+ */
+ default boolean isCacheEnabled() {
+ return true;
+ }
+
+ /**
+ * Wait for the bucket cache to be enabled while server restart
+ * @param timeout time to wait for the bucket cache to be enable
+ * @return boolean true if the bucket cache is enabled, false otherwise
+ */
+ default boolean waitForCacheInitialization(long timeout) {
+ return true;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index 45301abe08c..ba1a5ed9ae9 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -520,4 +520,15 @@ public class CombinedBlockCache implements
ResizableBlockCache, HeapSize {
return l1Cache.evictBlocksRangeByHfileName(hfileName, initOffset,
endOffset)
+ l2Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset);
}
+
+ @Override
+ public boolean waitForCacheInitialization(long timeout) {
+ return this.l1Cache.waitForCacheInitialization(timeout)
+ && this.l2Cache.waitForCacheInitialization(timeout);
+ }
+
+ @Override
+ public boolean isCacheEnabled() {
+ return l1Cache.isCacheEnabled() && l2Cache.isCacheEnabled();
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 86dcdf97065..bb09b98c39c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory;
public class HFilePreadReader extends HFileReaderImpl {
private static final Logger LOG =
LoggerFactory.getLogger(HFileReaderImpl.class);
+ private static final int WAIT_TIME_FOR_CACHE_INITIALIZATION = 10 * 60 * 1000;
+
public HFilePreadReader(ReaderContext context, HFileInfo fileInfo,
CacheConfig cacheConf,
Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, conf);
@@ -100,13 +102,15 @@ public class HFilePreadReader extends HFileReaderImpl {
HFileBlock block = prefetchStreamReader.readBlock(offset,
onDiskSizeOfNextBlock,
/* cacheBlock= */true, /* pread= */false, false, false, null,
null, true);
try {
- if (!cacheConf.isInMemory() &&
!cache.blockFitsIntoTheCache(block).orElse(true)) {
- LOG.warn(
- "Interrupting prefetch for file {} because block {} of
size {} "
- + "doesn't fit in the available cache space.",
- path, cacheKey, block.getOnDiskSizeWithHeader());
- interrupted = true;
- break;
+ if (!cacheConf.isInMemory()) {
+ if (!cache.blockFitsIntoTheCache(block).orElse(true)) {
+ LOG.warn(
+ "Interrupting prefetch for file {} because block {} of
size {} "
+ + "doesn't fit in the available cache space.
isCacheEnabled: {}",
+ path, cacheKey, block.getOnDiskSizeWithHeader(),
cache.isCacheEnabled());
+ interrupted = true;
+ break;
+ }
}
onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
offset += block.getOnDiskSizeWithHeader();
@@ -168,7 +172,7 @@ public class HFilePreadReader extends HFileReaderImpl {
// Deallocate data blocks
cacheConf.getBlockCache().ifPresent(cache -> {
if (evictOnClose) {
- int numEvicted = cache.evictBlocksByHfilePath(path);
+ int numEvicted = cache.evictBlocksByHfileName(name);
if (LOG.isTraceEnabled()) {
LOG.trace("On close, file= {} evicted= {} block(s)", name,
numEvicted);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 9174be34b08..c0745ca8f89 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -184,11 +184,23 @@ public class BucketCache implements BlockCache, HeapSize {
private transient BucketCachePersister cachePersister;
+ /**
+ * Enum to represent the state of cache
+ */
+ protected enum CacheState {
+ // Initializing: State when the cache is being initialised from
persistence.
+ INITIALIZING,
+ // Enabled: State when cache is initialised and is ready.
+ ENABLED,
+ // Disabled: State when the cache is disabled.
+ DISABLED
+ }
+
/**
* Flag if the cache is enabled or not... We shut it off if there are IO
errors for some time, so
* that Bucket IO exceptions/errors don't bring down the HBase server.
*/
- private volatile boolean cacheEnabled;
+ private volatile CacheState cacheState;
/**
* A list of writer queues. We have a queue per {@link WriterThread} we have
running. In other
@@ -341,6 +353,7 @@ public class BucketCache implements BlockCache, HeapSize {
this.persistencePath = persistencePath;
this.blockSize = blockSize;
this.ioErrorsTolerationDuration = ioErrorsTolerationDuration;
+ this.cacheState = CacheState.INITIALIZING;
this.allocFailLogPrevTs = 0;
@@ -352,32 +365,18 @@ public class BucketCache implements BlockCache, HeapSize {
this.ramCache = new RAMCache();
this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
+ instantiateWriterThreads();
if (isCachePersistent()) {
if (ioEngine instanceof FileIOEngine) {
startBucketCachePersisterThread();
}
- try {
- retrieveFromFile(bucketSizes);
- } catch (IOException ioex) {
- LOG.error("Can't restore from file[{}] because of ", persistencePath,
ioex);
- backingMap.clear();
- fullyCachedFiles.clear();
- backingMapValidated.set(true);
- bucketAllocator = new BucketAllocator(capacity, bucketSizes);
- regionCachedSize.clear();
- }
+ startPersistenceRetriever(bucketSizes, capacity);
} else {
bucketAllocator = new BucketAllocator(capacity, bucketSizes);
+ this.cacheState = CacheState.ENABLED;
+ startWriterThreads();
}
- final String threadName = Thread.currentThread().getName();
- this.cacheEnabled = true;
- for (int i = 0; i < writerThreads.length; ++i) {
- writerThreads[i] = new WriterThread(writerQueues.get(i));
- writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
- writerThreads[i].setDaemon(true);
- }
- startWriterThreads();
// Run the statistics thread periodically to print the cache statistics log
// TODO: Add means of turning this off. Bit obnoxious running thread just
to make a log
@@ -387,7 +386,32 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.info("Started bucket cache; ioengine=" + ioEngineName + ", capacity="
+ StringUtils.byteDesc(capacity) + ", blockSize=" +
StringUtils.byteDesc(blockSize)
+ ", writerThreadNum=" + writerThreadNum + ", writerQLen=" + writerQLen
+ ", persistencePath="
- + persistencePath + ", bucketAllocator=" +
this.bucketAllocator.getClass().getName());
+ + persistencePath + ", bucketAllocator=" +
BucketAllocator.class.getName());
+ }
+
+ private void startPersistenceRetriever(int[] bucketSizes, long capacity) {
+ Runnable persistentCacheRetriever = () -> {
+ try {
+ retrieveFromFile(bucketSizes);
+ LOG.info("Persistent bucket cache recovery from {} is complete.",
persistencePath);
+ } catch (IOException ioex) {
+ LOG.error("Can't restore from file[{}] because of ", persistencePath,
ioex);
+ backingMap.clear();
+ fullyCachedFiles.clear();
+ backingMapValidated.set(true);
+ try {
+ bucketAllocator = new BucketAllocator(capacity, bucketSizes);
+ } catch (BucketAllocatorException ex) {
+ LOG.error("Exception during Bucket Allocation", ex);
+ }
+ regionCachedSize.clear();
+ } finally {
+ this.cacheState = CacheState.ENABLED;
+ startWriterThreads();
+ }
+ };
+ Thread t = new Thread(persistentCacheRetriever);
+ t.start();
}
private void sanityCheckConfigs() {
@@ -413,6 +437,18 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
+ /**
+ * Called by the constructor to instantiate the writer threads.
+ */
+ private void instantiateWriterThreads() {
+ final String threadName = Thread.currentThread().getName();
+ for (int i = 0; i < this.writerThreads.length; ++i) {
+ this.writerThreads[i] = new WriterThread(this.writerQueues.get(i));
+ this.writerThreads[i].setName(threadName + "-BucketCacheWriter-" + i);
+ this.writerThreads[i].setDaemon(true);
+ }
+ }
+
/**
* Called by the constructor to start the writer threads. Used by tests that
need to override
* starting the threads.
@@ -430,8 +466,9 @@ public class BucketCache implements BlockCache, HeapSize {
cachePersister.start();
}
- boolean isCacheEnabled() {
- return this.cacheEnabled;
+ @Override
+ public boolean isCacheEnabled() {
+ return this.cacheState == CacheState.ENABLED;
}
@Override
@@ -520,7 +557,7 @@ public class BucketCache implements BlockCache, HeapSize {
*/
public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem,
boolean inMemory,
boolean wait) {
- if (cacheEnabled) {
+ if (isCacheEnabled()) {
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
if (shouldReplaceExistingCacheBlock(cacheKey, cachedItem)) {
BucketEntry bucketEntry = backingMap.get(cacheKey);
@@ -542,7 +579,7 @@ public class BucketCache implements BlockCache, HeapSize {
protected void cacheBlockWithWaitInternal(BlockCacheKey cacheKey, Cacheable
cachedItem,
boolean inMemory, boolean wait) {
- if (!cacheEnabled) {
+ if (!isCacheEnabled()) {
return;
}
if (cacheKey.getBlockType() == null && cachedItem.getBlockType() != null) {
@@ -620,7 +657,7 @@ public class BucketCache implements BlockCache, HeapSize {
@Override
public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat,
boolean updateCacheMetrics) {
- if (!cacheEnabled) {
+ if (!isCacheEnabled()) {
return null;
}
RAMQueueEntry re = ramCache.get(key);
@@ -782,7 +819,7 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private boolean doEvictBlock(BlockCacheKey cacheKey, BucketEntry bucketEntry,
boolean evictedByEvictionProcess) {
- if (!cacheEnabled) {
+ if (!isCacheEnabled()) {
return false;
}
boolean existedInRamCache = removeFromRamCache(cacheKey);
@@ -905,6 +942,10 @@ public class BucketCache implements BlockCache, HeapSize {
isCacheInconsistent.set(setCacheInconsistent);
}
+ protected void setCacheState(CacheState state) {
+ cacheState = state;
+ }
+
/*
* Statistics thread. Periodically output cache statistics to the log.
*/
@@ -924,6 +965,10 @@ public class BucketCache implements BlockCache, HeapSize {
}
public void logStats() {
+ if (!isCacheInitialized("BucketCache::logStats")) {
+ return;
+ }
+
long totalSize = bucketAllocator.getTotalSize();
long usedSize = bucketAllocator.getUsedSize();
long freeSize = totalSize - usedSize;
@@ -956,10 +1001,17 @@ public class BucketCache implements BlockCache, HeapSize
{
}
public long acceptableSize() {
+ if (!isCacheInitialized("BucketCache::acceptableSize")) {
+ return 0;
+ }
return (long) Math.floor(bucketAllocator.getTotalSize() *
acceptableFactor);
}
long getPartitionSize(float partitionFactor) {
+ if (!isCacheInitialized("BucketCache::getPartitionSize")) {
+ return 0;
+ }
+
return (long) Math.floor(bucketAllocator.getTotalSize() * partitionFactor
* minFactor);
}
@@ -967,6 +1019,10 @@ public class BucketCache implements BlockCache, HeapSize {
* Return the count of bucketSizeinfos still need free space
*/
private int bucketSizesAboveThresholdCount(float minFactor) {
+ if (!isCacheInitialized("BucketCache::bucketSizesAboveThresholdCount")) {
+ return 0;
+ }
+
BucketAllocator.IndexStatistics[] stats =
bucketAllocator.getIndexStatistics();
int fullCount = 0;
for (int i = 0; i < stats.length; i++) {
@@ -987,6 +1043,10 @@ public class BucketCache implements BlockCache, HeapSize {
* @param completelyFreeBucketsNeeded number of buckets to free
**/
private void freeEntireBuckets(int completelyFreeBucketsNeeded) {
+ if (!isCacheInitialized("BucketCache::freeEntireBuckets")) {
+ return;
+ }
+
if (completelyFreeBucketsNeeded != 0) {
// First we will build a set where the offsets are reference counted,
usually
// this set is small around O(Handler Count) unless something else is
wrong
@@ -1036,6 +1096,9 @@ public class BucketCache implements BlockCache, HeapSize {
* @param why Why we are being called
*/
void freeSpace(final String why) {
+ if (!isCacheInitialized("BucketCache::freeSpace")) {
+ return;
+ }
// Ensure only one freeSpace progress at a time
if (!freeSpaceLock.tryLock()) {
return;
@@ -1235,13 +1298,13 @@ public class BucketCache implements BlockCache,
HeapSize {
public void run() {
List<RAMQueueEntry> entries = new ArrayList<>();
try {
- while (cacheEnabled && writerEnabled) {
+ while (isCacheEnabled() && writerEnabled) {
try {
try {
// Blocks
entries = getRAMQueueEntries(inputQueue, entries);
} catch (InterruptedException ie) {
- if (!cacheEnabled || !writerEnabled) {
+ if (!isCacheEnabled() || !writerEnabled) {
break;
}
}
@@ -1253,7 +1316,7 @@ public class BucketCache implements BlockCache, HeapSize {
} catch (Throwable t) {
LOG.warn("Failed doing drain", t);
}
- LOG.info(this.getName() + " exiting, cacheEnabled=" + cacheEnabled);
+ LOG.info(this.getName() + " exiting, cacheEnabled=" + isCacheEnabled());
}
}
@@ -1344,7 +1407,7 @@ public class BucketCache implements BlockCache, HeapSize {
// Index updated inside loop if success or if we can't succeed. We retry
if cache is full
// when we go to add an entry by going around the loop again without
upping the index.
int index = 0;
- while (cacheEnabled && index < size) {
+ while (isCacheEnabled() && index < size) {
RAMQueueEntry re = null;
try {
re = entries.get(index);
@@ -1477,10 +1540,19 @@ public class BucketCache implements BlockCache,
HeapSize {
File tempPersistencePath = new File(persistencePath +
EnvironmentEdgeManager.currentTime());
try (FileOutputStream fos = new FileOutputStream(tempPersistencePath,
false)) {
LOG.debug("Persist in new chunked persistence format.");
+
persistChunkedBackingMap(fos);
+
+ LOG.debug(
+ "PersistToFile: after persisting backing map size: {},
fullycachedFiles size: {},"
+ + " file name: {}",
+ backingMap.size(), fullyCachedFiles.size(),
tempPersistencePath.getName());
} catch (IOException e) {
LOG.error("Failed to persist bucket cache to file", e);
throw e;
+ } catch (Throwable e) {
+ LOG.error("Failed during persist bucket cache to file: ", e);
+ throw e;
}
LOG.debug("Thread {} finished persisting bucket cache to file, renaming",
Thread.currentThread().getName());
@@ -1513,7 +1585,7 @@ public class BucketCache implements BlockCache, HeapSize {
backingMapValidated.set(true);
return;
}
- assert !cacheEnabled;
+ assert !isCacheEnabled();
try (FileInputStream in = new FileInputStream(persistenceFile)) {
int pblen = ProtobufMagic.lengthOfPBMagic();
@@ -1656,6 +1728,10 @@ public class BucketCache implements BlockCache, HeapSize
{
blocksByHFile = pair.getSecond();
fullyCachedFiles.clear();
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap()));
+
+ LOG.info("After retrieval Backing map size: {}, fullyCachedFiles size:
{}", backingMap.size(),
+ fullyCachedFiles.size());
+
verifyFileIntegrity(proto);
updateRegionSizeMapWhileRetrievingFromFile();
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(),
proto.getMapClass());
@@ -1719,7 +1795,7 @@ public class BucketCache implements BlockCache, HeapSize {
// Do a single read to a local variable to avoid timing issue - HBASE-24454
long ioErrorStartTimeTmp = this.ioErrorStartTime;
if (ioErrorStartTimeTmp > 0) {
- if (cacheEnabled && (now - ioErrorStartTimeTmp) >
this.ioErrorsTolerationDuration) {
+ if (isCacheEnabled() && (now - ioErrorStartTimeTmp) >
this.ioErrorsTolerationDuration) {
LOG.error("IO errors duration time has exceeded " +
ioErrorsTolerationDuration
+ "ms, disabling cache, please check your IOEngine");
disableCache();
@@ -1733,9 +1809,11 @@ public class BucketCache implements BlockCache, HeapSize
{
* Used to shut down the cache -or- turn it off in the case of something
broken.
*/
private void disableCache() {
- if (!cacheEnabled) return;
+ if (!isCacheEnabled()) {
+ return;
+ }
LOG.info("Disabling cache");
- cacheEnabled = false;
+ cacheState = CacheState.DISABLED;
ioEngine.shutdown();
this.scheduleThreadPool.shutdown();
for (int i = 0; i < writerThreads.length; ++i)
@@ -1819,6 +1897,9 @@ public class BucketCache implements BlockCache, HeapSize {
@Override
public long getFreeSize() {
+ if (!isCacheInitialized("BucketCache:getFreeSize")) {
+ return 0;
+ }
return this.bucketAllocator.getFreeSize();
}
@@ -1834,6 +1915,9 @@ public class BucketCache implements BlockCache, HeapSize {
@Override
public long getCurrentSize() {
+ if (!isCacheInitialized("BucketCache::getCurrentSize")) {
+ return 0;
+ }
return this.bucketAllocator.getUsedSize();
}
@@ -2358,6 +2442,10 @@ public class BucketCache implements BlockCache, HeapSize
{
@Override
public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
+ if (!isCacheInitialized("blockFitsIntoTheCache")) {
+ return Optional.of(false);
+ }
+
long currentUsed = bucketAllocator.getUsedSize();
boolean result = (currentUsed + block.getOnDiskSizeWithHeader()) <
acceptableSize();
return Optional.of(result);
@@ -2408,4 +2496,27 @@ public class BucketCache implements BlockCache, HeapSize
{
}
}
+
+ boolean isCacheInitialized(String api) {
+ if (cacheState == CacheState.INITIALIZING) {
+ LOG.warn("Bucket initialisation pending at {}", api);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public boolean waitForCacheInitialization(long timeout) {
+ try {
+ while (cacheState == CacheState.INITIALIZING) {
+ if (timeout <= 0) {
+ break;
+ }
+ Thread.sleep(100);
+ timeout -= 100;
+ }
+ } finally {
+ return isCacheEnabled();
+ }
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
index e4382d2561e..2039debeef9 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
@@ -62,6 +62,8 @@ public class BucketCachePersister extends Thread {
LOG.info("Finishing cache persister thread.");
} catch (InterruptedException e) {
LOG.warn("Interrupting BucketCachePersister thread.", e);
+ } catch (Throwable e) {
+ LOG.error("Failed during persisting bucket cache to file: ", e);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
index d9d79b62238..d48dc518175 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/CacheAwareLoadBalancer.java
@@ -28,6 +28,7 @@ package org.apache.hadoop.hbase.master.balancer;
import static
org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KEY;
+import java.text.DecimalFormat;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
@@ -284,6 +285,9 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
return false;
}
+ DecimalFormat df = new DecimalFormat("#");
+ df.setMaximumFractionDigits(4);
+
float cacheRatioDiffThreshold = 0.6f;
// Conditions for moving the region
@@ -303,7 +307,7 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
LOG.debug(
"Region {} moved from {} to {} as the region is cached {} equally
on both servers",
cluster.regions[regionIndex].getEncodedName(),
cluster.servers[currentServerIndex],
- cluster.servers[oldServerIndex], cacheRatioOnCurrentServer);
+ cluster.servers[oldServerIndex],
df.format(cacheRatioOnCurrentServer));
}
return true;
}
@@ -320,7 +324,8 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
"Region {} moved from {} to {} as region cache ratio {} is better
than the current "
+ "cache ratio {}",
cluster.regions[regionIndex].getEncodedName(),
cluster.servers[currentServerIndex],
- cluster.servers[oldServerIndex], cacheRatioOnCurrentServer,
cacheRatioOnOldServer);
+ cluster.servers[oldServerIndex], cacheRatioOnCurrentServer,
+ df.format(cacheRatioOnCurrentServer));
}
return true;
}
@@ -329,7 +334,8 @@ public class CacheAwareLoadBalancer extends
StochasticLoadBalancer {
LOG.debug(
"Region {} not moved from {} to {} with current cache ratio {} and
old cache ratio {}",
cluster.regions[regionIndex], cluster.servers[currentServerIndex],
- cluster.servers[oldServerIndex], cacheRatioOnCurrentServer,
cacheRatioOnOldServer);
+ cluster.servers[oldServerIndex], cacheRatioOnCurrentServer,
+ df.format(cacheRatioOnCurrentServer));
}
return false;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f75e8f5ac5e..b1bc6e2a20b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1608,9 +1608,9 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
boolean isGracefulStop) throws IOException {
// Only allow one thread to close at a time. Serialize them so dual
// threads attempting to close will run up against each other.
- MonitoredTask status = TaskMonitor.get().createStatus(
- "Closing region " + this.getRegionInfo().getEncodedName() + (abort ? "
due to abort" : ""),
- true);
+ MonitoredTask status =
+ TaskMonitor.get().createStatus("Closing region " +
this.getRegionInfo().getEncodedName()
+ + (abort ? " due to abort" : " as it is being closed"), true);
status.setStatus("Waiting for close lock");
try {
synchronized (closeLock) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
index 306cdf3e234..baacb3c18a2 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
@@ -139,6 +139,7 @@ public class TestBlockEvictionOnRegionMovement {
cluster.startRegionServer();
Thread.sleep(500);
+ regionServingRS.getBlockCache().get().waitForCacheInitialization(10000);
long newUsedCacheSize =
regionServingRS.getBlockCache().get().getBlockCaches()[1].getCurrentSize();
assertEquals(oldUsedCacheSize, newUsedCacheSize);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 8bbb14fd966..04d098e23b8 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -111,7 +111,7 @@ public class TestPrefetch {
public OpenTelemetryRule otelRule = OpenTelemetryRule.create();
@Before
- public void setUp() throws IOException {
+ public void setUp() throws IOException, InterruptedException {
conf = TEST_UTIL.getConfiguration();
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
fs = HFileSystem.get(conf);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index e42fc594f52..7b08d05c19b 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
@@ -253,10 +254,8 @@ public class TestBucketCache {
public static void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey
cacheKey)
throws InterruptedException {
- while (!cache.backingMap.containsKey(cacheKey) ||
cache.ramCache.containsKey(cacheKey)) {
- Thread.sleep(100);
- }
- Thread.sleep(1000);
+ Waiter.waitFor(HBaseConfiguration.create(), 10000,
+ () -> (cache.backingMap.containsKey(cacheKey) &&
!cache.ramCache.containsKey(cacheKey)));
}
public static void waitUntilAllFlushedToBucket(BucketCache cache) throws
InterruptedException {
@@ -330,6 +329,7 @@ public class TestBucketCache {
try {
bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
@@ -357,6 +357,7 @@ public class TestBucketCache {
try {
bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
@@ -374,6 +375,7 @@ public class TestBucketCache {
try {
bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
@@ -390,6 +392,8 @@ public class TestBucketCache {
assertTrue(new File(persistencePath).exists());
bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
} finally {
if (bucketCache != null) {
@@ -427,6 +431,7 @@ public class TestBucketCache {
try {
bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
smallBucketSizes, writeThreads, writerQLen, persistencePath);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
assertFalse(new File(persistencePath).exists());
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
@@ -440,6 +445,7 @@ public class TestBucketCache {
public void testRetrieveFromFileWithoutPersistence() throws Exception {
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
try {
final Path testDir = createAndGetTestDir();
String ioEngineName = "file:" + testDir + "/bucket.cache";
@@ -458,6 +464,7 @@ public class TestBucketCache {
bucketCache.shutdown();
bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
assertEquals(0, bucketCache.getAllocator().getUsedSize());
} finally {
bucketCache.shutdown();
@@ -486,6 +493,7 @@ public class TestBucketCache {
BucketCache cache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
+ assertTrue(cache.waitForCacheInitialization(10000));
validateGetPartitionSize(cache, 0.1f, 0.5f);
validateGetPartitionSize(cache, 0.7f, 0.5f);
@@ -522,6 +530,7 @@ public class TestBucketCache {
BucketCache cache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
+ assertTrue(cache.waitForCacheInitialization(10000));
assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
cache.getAcceptableFactor(), 0);
@@ -592,6 +601,7 @@ public class TestBucketCache {
}
BucketCache cache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
+ assertTrue(cache.waitForCacheInitialization(10000));
assertTrue("Created BucketCache and expected it to succeed: " +
expectSuccess[i]
+ ", but it actually was: " + !expectSuccess[i], expectSuccess[i]);
} catch (IllegalArgumentException e) {
@@ -818,6 +828,7 @@ public class TestBucketCache {
bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);
@@ -841,6 +852,7 @@ public class TestBucketCache {
// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
@@ -869,6 +881,7 @@ public class TestBucketCache {
bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, 1, 1, persistencePath);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);
@@ -900,6 +913,7 @@ public class TestBucketCache {
// restore cache from file
bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
assertEquals(usedByteSize, bucketCache.getAllocator().getUsedSize());
for (HFileBlockPair hfileBlockPair : hfileBlockPairs) {
@@ -1031,6 +1045,7 @@ public class TestBucketCache {
String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, 1, 1, null);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);
HFileBlockPair[] hfileBlockPairs =
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index 429fffa38f6..facbe7c50d1 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -84,6 +84,7 @@ public class TestBucketWriterThread {
final int writerThreadsCount = 1;
this.bc = new MockBucketCache("offheap", capacity, 1, new int[] { 1 },
writerThreadsCount,
capacity, null, 100/* Tolerate ioerrors for 100ms */);
+ this.bc.waitForCacheInitialization(10000);
assertEquals(writerThreadsCount, bc.writerThreads.length);
assertEquals(writerThreadsCount, bc.writerQueues.size());
// Get reference to our single WriterThread instance.
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
index 6d213ac8b40..a2909c005fd 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
@@ -109,6 +109,7 @@ public class TestPrefetchPersistence {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
+ bucketCache.waitForCacheInitialization(10000);
cacheConf = new CacheConfig(conf, bucketCache);
long usedSize = bucketCache.getAllocator().getUsedSize();
@@ -127,6 +128,7 @@ public class TestPrefetchPersistence {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
+ bucketCache.waitForCacheInitialization(10000);
cacheConf = new CacheConfig(conf, bucketCache);
assertTrue(usedSize != 0);
assertTrue(bucketCache.fullyCachedFiles.containsKey(storeFile.getName()));
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
index 2ba20300005..0726a77d2c9 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
@@ -24,12 +24,14 @@ import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FAC
import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
@@ -63,6 +65,9 @@ public class TestRecoveryPersistentBucketCache {
BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(bucketCache.waitForCacheInitialization(1000));
+ assertTrue(
+ bucketCache.isCacheInitialized("testBucketCacheRecovery") &&
bucketCache.isCacheEnabled());
CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(8192, 4);
@@ -98,7 +103,8 @@ public class TestRecoveryPersistentBucketCache {
BucketCache newBucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
- Thread.sleep(100);
+ assertTrue(newBucketCache.waitForCacheInitialization(1000));
+
assertEquals(3, newBucketCache.backingMap.size());
assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false,
false));
assertNull(newBucketCache.getBlock(smallerBlocks[0].getBlockName(), false,
false, false));
@@ -123,6 +129,7 @@ public class TestRecoveryPersistentBucketCache {
BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(8192, 4);
@@ -137,7 +144,7 @@ public class TestRecoveryPersistentBucketCache {
BucketCache newBucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
8192, bucketSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence",
DEFAULT_ERROR_TOLERATION_DURATION, conf);
- Thread.sleep(100);
+ assertTrue(newBucketCache.waitForCacheInitialization(10000));
assertEquals(4, newBucketCache.backingMap.size());
newBucketCache.evictBlocksByHfileName(blocks[0].getBlockName().getHfileName());
assertEquals(3, newBucketCache.backingMap.size());
@@ -202,11 +209,61 @@ public class TestRecoveryPersistentBucketCache {
TEST_UTIL.cleanupTestDir();
}
+ @Test
+ public void testValidateCacheInitialization() throws Exception {
+ HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ Path testDir = TEST_UTIL.getDataTestDir();
+ TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+ Configuration conf = HBaseConfiguration.create();
+ // Disables the persister thread by setting its interval to MAX_VALUE
+ conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
+ int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
+ BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
+ 8192, bucketSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence",
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+
+ CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(8192, 4);
+
+ // Add four blocks
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(),
blocks[0].getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(),
blocks[1].getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(),
blocks[2].getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(),
blocks[3].getBlock());
+ // saves the current state of the cache
+ bucketCache.persistToFile();
+
+ BucketCache newBucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
+ 8192, bucketSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence",
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(newBucketCache.waitForCacheInitialization(10000));
+
+ // Set the state of bucket cache to INITIALIZING
+ newBucketCache.setCacheState(BucketCache.CacheState.INITIALIZING);
+
+ // Validate that zero values are returned for the cache being initialized.
+ assertEquals(0, newBucketCache.acceptableSize());
+ assertEquals(0, newBucketCache.getPartitionSize(1));
+ assertEquals(0, newBucketCache.getFreeSize());
+ assertEquals(0, newBucketCache.getCurrentSize());
+ assertEquals(false,
newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get());
+
+ newBucketCache.setCacheState(BucketCache.CacheState.ENABLED);
+
+ // Validate that non-zero values are returned for enabled cache
+ assertTrue(newBucketCache.acceptableSize() > 0);
+ assertTrue(newBucketCache.getPartitionSize(1) > 0);
+ assertTrue(newBucketCache.getFreeSize() > 0);
+ assertTrue(newBucketCache.getCurrentSize() > 0);
+
assertTrue(newBucketCache.blockFitsIntoTheCache(blocks[0].getBlock()).get());
+
+ TEST_UTIL.cleanupTestDir();
+ }
+
private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey
cacheKey)
throws InterruptedException {
- while (!cache.backingMap.containsKey(cacheKey) ||
cache.ramCache.containsKey(cacheKey)) {
- Thread.sleep(100);
- }
+ Waiter.waitFor(HBaseConfiguration.create(), 12000,
+ () -> (cache.backingMap.containsKey(cacheKey) &&
!cache.ramCache.containsKey(cacheKey)));
}
// BucketCache.cacheBlock is async, it first adds block to ramCache and
writeQueue, then writer
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
index 98131cd2b7a..5086e1d7c45 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -108,7 +108,7 @@ public class TestVerifyBucketCacheFile {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
- long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(bucketCache.waitForCacheInitialization(10000));long usedSize
= bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
@@ -125,30 +125,30 @@ public class TestVerifyBucketCacheFile {
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
waitPersistentCacheValidation(conf, bucketCache);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
// persist cache to file
bucketCache.shutdown();
- // 2.delete bucket cache file
- final java.nio.file.Path cacheFile =
- FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
- assertTrue(Files.deleteIfExists(cacheFile));
- // can't restore cache from file
- recoveredBucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
- testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
- waitPersistentCacheValidation(conf, recoveredBucketCache);
- assertEquals(0, recoveredBucketCache.getAllocator().getUsedSize());
- assertEquals(0, recoveredBucketCache.backingMap.size());
- // Add blocks
- for (CacheTestUtils.HFileBlockPair block : blocks) {
- cacheAndWaitUntilFlushedToBucket(recoveredBucketCache,
block.getBlockName(),
- block.getBlock());
- }
- usedSize = recoveredBucketCache.getAllocator().getUsedSize();
- assertNotEquals(0, usedSize);
- // persist cache to file
- recoveredBucketCache.shutdown();
+ // 2.delete bucket cache file
+ final java.nio.file.Path cacheFile =
+ FileSystems.getDefault().getPath(testDir.toString(), "bucket.cache");
+ assertTrue(Files.deleteIfExists(cacheFile));
+ // can't restore cache from file
+ bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
+ constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+ // Add blocks
+ for (CacheTestUtils.HFileBlockPair block : blocks) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(),
block.getBlock());
+ }
+ usedSize = bucketCache.getAllocator().getUsedSize();
+ assertNotEquals(0, usedSize);
+ // persist cache to file
+ bucketCache.shutdown();
// 3.delete backingMap persistence file
final java.nio.file.Path mapFile =
@@ -186,7 +186,7 @@ public class TestVerifyBucketCacheFile {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
DEFAULT_ERROR_TOLERATION_DURATION, conf);
- long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(bucketCache.waitForCacheInitialization(10000)); long
usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
@@ -206,6 +206,7 @@ public class TestVerifyBucketCacheFile {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
waitPersistentCacheValidation(conf, bucketCache);
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
@@ -237,7 +238,7 @@ public class TestVerifyBucketCacheFile {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
- long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(bucketCache.waitForCacheInitialization(10000));long usedSize
= bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
CacheTestUtils.HFileBlockPair[] blocks =
@@ -299,7 +300,7 @@ public class TestVerifyBucketCacheFile {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
- long usedSize = bucketCache.getAllocator().getUsedSize();
+ assertTrue(bucketCache.waitForCacheInitialization(10000));long usedSize
= bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
Pair<String, Long> myPair = new Pair<>();
@@ -326,7 +327,7 @@ public class TestVerifyBucketCacheFile {
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
waitPersistentCacheValidation(conf, bucketCache);
- assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+
assertTrue(bucketCache.waitForCacheInitialization(10000));assertEquals(usedSize,
bucketCache.getAllocator().getUsedSize());
assertEquals(blockCount, bucketCache.backingMap.size());
} finally {
if (bucketCache != null) {
@@ -359,6 +360,7 @@ public class TestVerifyBucketCacheFile {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
@@ -426,6 +428,7 @@ public class TestVerifyBucketCacheFile {
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertTrue(bucketCache.waitForCacheInitialization(10000));
CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);