This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 95923272f07 HBASE-28170 Put the cached time at the beginning of the
block; run cache validation in the background when retrieving the persistent
cache (#5471)
95923272f07 is described below
commit 95923272f07e66bf2e97f7e9fc87611b870a68e7
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Tue Oct 24 09:30:11 2023 +0100
HBASE-28170 Put the cached time at the beginning of the block; run cache
validation in the background when retrieving the persistent cache (#5471)
Signed-off-by: Peter Somogyi <[email protected]>
---
.../hadoop/hbase/io/hfile/HFilePreadReader.java | 39 +++----
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 100 +++++++++++-----
.../io/hfile/bucket/BucketCachePersister.java | 41 +++++--
.../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 16 ++-
.../io/hfile/TestPrefetchWithBucketCache.java | 2 +-
.../bucket/TestRecoveryPersistentBucketCache.java | 126 +++++++++++++++++++++
.../io/hfile/bucket/TestVerifyBucketCacheFile.java | 12 +-
7 files changed, 266 insertions(+), 70 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index f9c0ae59242..1ac9a4ffb84 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -40,7 +40,9 @@ public class HFilePreadReader extends HFileReaderImpl {
Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, conf);
final MutableBoolean fileAlreadyCached = new MutableBoolean(false);
- BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent(bc ->
fileAlreadyCached
+ Optional<BucketCache> bucketCacheOptional =
+ BucketCache.getBucketCacheFromCacheConfig(cacheConf);
+ bucketCacheOptional.ifPresent(bc -> fileAlreadyCached
.setValue(bc.getFullyCachedFiles().get(path.getName()) == null ? false :
true));
// Prefetch file blocks upon open if requested
if (
@@ -65,8 +67,6 @@ public class HFilePreadReader extends HFileReaderImpl {
if (LOG.isTraceEnabled()) {
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset,
end));
}
- Optional<BucketCache> bucketCacheOptional =
- BucketCache.getBuckedCacheFromCacheConfig(cacheConf);
// Don't use BlockIterator here, because it's designed to read
load-on-open section.
long onDiskSizeOfNextBlock = -1;
while (offset < end) {
@@ -78,21 +78,24 @@ public class HFilePreadReader extends HFileReaderImpl {
// to the next block without actually going read all the way to
the cache.
if (bucketCacheOptional.isPresent()) {
BucketCache cache = bucketCacheOptional.get();
- BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
- BucketEntry entry = cache.getBackingMap().get(cacheKey);
- if (entry != null) {
- cacheKey = new BlockCacheKey(name, offset);
- entry = cache.getBackingMap().get(cacheKey);
- if (entry == null) {
- LOG.debug("No cache key {}, we'll read and cache it",
cacheKey);
+ if (cache.getBackingMapValidated().get()) {
+ BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
+ BucketEntry entry = cache.getBackingMap().get(cacheKey);
+ if (entry != null) {
+ cacheKey = new BlockCacheKey(name, offset);
+ entry = cache.getBackingMap().get(cacheKey);
+ if (entry == null) {
+ LOG.debug("No cache key {}, we'll read and cache it",
cacheKey);
+ } else {
+ offset += entry.getOnDiskSizeWithHeader();
+ LOG.debug(
+ "Found cache key {}. Skipping prefetch, the block is
already cached.",
+ cacheKey);
+ continue;
+ }
} else {
- offset += entry.getOnDiskSizeWithHeader();
- LOG.debug("Found cache key {}. Skipping prefetch, the
block is already cached.",
- cacheKey);
- continue;
+ LOG.debug("No entry in the backing map for cache key {}",
cacheKey);
}
- } else {
- LOG.debug("No entry in the backing map for cache key {}",
cacheKey);
}
}
// Perhaps we got our block from cache? Unlikely as this may be,
if it happens, then
@@ -111,9 +114,7 @@ public class HFilePreadReader extends HFileReaderImpl {
block.release();
}
}
- BucketCache.getBuckedCacheFromCacheConfig(cacheConf)
- .ifPresent(bc -> bc.fileCacheCompleted(path.getName()));
-
+ bucketCacheOptional.ifPresent(bc ->
bc.fileCacheCompleted(path.getName()));
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation,
etc.)
if (LOG.isTraceEnabled()) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index bc5e7e7c9b9..c082273b53b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -149,6 +149,8 @@ public class BucketCache implements BlockCache, HeapSize {
// In this map, store the block's meta data like offset, length
transient Map<BlockCacheKey, BucketEntry> backingMap;
+ private AtomicBoolean backingMapValidated = new AtomicBoolean(false);
+
/** Set of files for which prefetch is completed */
final Map<String, Boolean> fullyCachedFiles = new ConcurrentHashMap<>();
@@ -312,7 +314,6 @@ public class BucketCache implements BlockCache, HeapSize {
this.allocFailLogPrevTs = 0;
- bucketAllocator = new BucketAllocator(capacity, bucketSizes);
for (int i = 0; i < writerThreads.length; ++i) {
writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
}
@@ -329,10 +330,14 @@ public class BucketCache implements BlockCache, HeapSize {
try {
retrieveFromFile(bucketSizes);
} catch (IOException ioex) {
+ LOG.error("Can't restore from file[{}] because of ", persistencePath,
ioex);
backingMap.clear();
fullyCachedFiles.clear();
- LOG.error("Can't restore from file[" + persistencePath + "] because of
", ioex);
+ backingMapValidated.set(true);
+ bucketAllocator = new BucketAllocator(capacity, bucketSizes);
}
+ } else {
+ bucketAllocator = new BucketAllocator(capacity, bucketSizes);
}
final String threadName = Thread.currentThread().getName();
this.cacheEnabled = true;
@@ -385,6 +390,7 @@ public class BucketCache implements BlockCache, HeapSize {
}
void startBucketCachePersisterThread() {
+ LOG.info("Starting BucketCachePersisterThread");
cachePersister = new BucketCachePersister(this,
bucketcachePersistInterval);
cachePersister.setDaemon(true);
cachePersister.start();
@@ -540,6 +546,7 @@ public class BucketCache implements BlockCache, HeapSize {
} else {
this.blockNumber.increment();
this.heapSize.add(cachedItem.heapSize());
+ blocksByHFile.add(cacheKey);
}
}
@@ -600,6 +607,7 @@ public class BucketCache implements BlockCache, HeapSize {
// the cache map state might differ from the actual cache. If we reach
this block,
// we should remove the cache key entry from the backing map
backingMap.remove(key);
+ fullyCachedFiles.remove(key.getHfileName());
LOG.debug("Failed to fetch block for cache key: {}.", key, hioex);
} catch (IOException ioex) {
LOG.error("Failed reading block " + key + " from bucket cache", ioex);
@@ -695,6 +703,7 @@ public class BucketCache implements BlockCache, HeapSize {
} else {
return bucketEntryToUse.withWriteLock(offsetLock, () -> {
if (backingMap.remove(cacheKey, bucketEntryToUse)) {
+ LOG.debug("removed key {} from back map in the evict process",
cacheKey);
blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache,
evictedByEvictionProcess);
return true;
}
@@ -1266,6 +1275,8 @@ public class BucketCache implements BlockCache, HeapSize {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"OBL_UNSATISFIED_OBLIGATION",
justification = "false positive, try-with-resources ensures close is
called.")
void persistToFile() throws IOException {
+ LOG.debug("Thread {} started persisting bucket cache to file",
+ Thread.currentThread().getName());
if (!isCachePersistent()) {
throw new IOException("Attempt to persist non-persistent cache
mappings!");
}
@@ -1273,14 +1284,19 @@ public class BucketCache implements BlockCache,
HeapSize {
try (FileOutputStream fos = new FileOutputStream(tempPersistencePath,
false)) {
fos.write(ProtobufMagic.PB_MAGIC);
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
+ } catch (IOException e) {
+ LOG.error("Failed to persist bucket cache to file", e);
+ throw e;
}
+ LOG.debug("Thread {} finished persisting bucket cache to file, renaming",
+ Thread.currentThread().getName());
if (!tempPersistencePath.renameTo(new File(persistencePath))) {
LOG.warn("Failed to commit cache persistent file. We might lose cached
blocks if "
+ "RS crashes/restarts before we successfully checkpoint again.");
}
}
- private boolean isCachePersistent() {
+ public boolean isCachePersistent() {
return ioEngine.isPersistent() && persistencePath != null;
}
@@ -1288,8 +1304,14 @@ public class BucketCache implements BlockCache, HeapSize
{
* @see #persistToFile()
*/
private void retrieveFromFile(int[] bucketSizes) throws IOException {
+ LOG.info("Started retrieving bucket cache from file");
File persistenceFile = new File(persistencePath);
if (!persistenceFile.exists()) {
+ LOG.warn("Persistence file missing! "
+ + "It's ok if it's first run after enabling persistent cache.");
+ bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes,
backingMap, realCacheSize);
+ blockNumber.add(backingMap.size());
+ backingMapValidated.set(true);
return;
}
assert !cacheEnabled;
@@ -1311,6 +1333,7 @@ public class BucketCache implements BlockCache, HeapSize {
parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes,
backingMap, realCacheSize);
blockNumber.add(backingMap.size());
+ LOG.info("Bucket cache retrieved from file successfully");
}
}
@@ -1383,27 +1406,43 @@ public class BucketCache implements BlockCache,
HeapSize {
try {
((PersistentIOEngine)
ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
algorithm);
+ backingMapValidated.set(true);
} catch (IOException e) {
LOG.warn("Checksum for cache file failed. "
- + "We need to validate each cache key in the backing map. This may
take some time...");
- long startTime = EnvironmentEdgeManager.currentTime();
- int totalKeysOriginally = backingMap.size();
- for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry :
backingMap.entrySet()) {
- try {
- ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
- } catch (IOException e1) {
- LOG.debug("Check for key {} failed. Removing it from map.",
keyEntry.getKey());
- backingMap.remove(keyEntry.getKey());
- fullyCachedFiles.remove(keyEntry.getKey().getHfileName());
+ + "We need to validate each cache key in the backing map. "
+ + "This may take some time, so we'll do it in a background thread,");
+ Runnable cacheValidator = () -> {
+ while (bucketAllocator == null) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
}
- }
- LOG.info("Finished validating {} keys in the backing map. Recovered:
{}. This took {}ms.",
- totalKeysOriginally, backingMap.size(),
- (EnvironmentEdgeManager.currentTime() - startTime));
+ long startTime = EnvironmentEdgeManager.currentTime();
+ int totalKeysOriginally = backingMap.size();
+ for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry :
backingMap.entrySet()) {
+ try {
+ ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
+ } catch (IOException e1) {
+ LOG.debug("Check for key {} failed. Evicting.",
keyEntry.getKey());
+ evictBlock(keyEntry.getKey());
+ fullyCachedFiles.remove(keyEntry.getKey().getHfileName());
+ }
+ }
+ backingMapValidated.set(true);
+ LOG.info("Finished validating {} keys in the backing map. Recovered:
{}. This took {}ms.",
+ totalKeysOriginally, backingMap.size(),
+ (EnvironmentEdgeManager.currentTime() - startTime));
+ };
+ Thread t = new Thread(cacheValidator);
+ t.setDaemon(true);
+ t.start();
}
} else {
// if has not checksum, it means the persistence file is old format
LOG.info("Persistent file is old format, it does not support verifying
file integrity!");
+ backingMapValidated.set(true);
}
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(),
proto.getMapClass());
}
@@ -1432,6 +1471,7 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private void disableCache() {
if (!cacheEnabled) return;
+ LOG.info("Disabling cache");
cacheEnabled = false;
ioEngine.shutdown();
this.scheduleThreadPool.shutdown();
@@ -1456,11 +1496,15 @@ public class BucketCache implements BlockCache,
HeapSize {
LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
+ "; path to write="
+ persistencePath);
if (ioEngine.isPersistent() && persistencePath != null) {
- if (cachePersister != null) {
- cachePersister.interrupt();
- }
try {
join();
+ if (cachePersister != null) {
+ LOG.info("Shutting down cache persister thread.");
+ cachePersister.shutdown();
+ while (cachePersister.isAlive()) {
+ Thread.sleep(10);
+ }
+ }
persistToFile();
} catch (IOException ex) {
LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
@@ -1665,17 +1709,17 @@ public class BucketCache implements BlockCache,
HeapSize {
HFileBlock block = (HFileBlock) data;
ByteBuff sliceBuf = block.getBufferReadOnly();
block.getMetaData(metaBuff);
- ioEngine.write(sliceBuf, offset);
- // adds the cache time after the block and metadata part
+ // adds the cache time prior to the block and metadata part
if (isCachePersistent) {
- ioEngine.write(metaBuff, offset + len - metaBuff.limit() -
Long.BYTES);
ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
buffer.putLong(bucketEntry.getCachedTime());
buffer.rewind();
- ioEngine.write(buffer, (offset + len - Long.BYTES));
+ ioEngine.write(buffer, offset);
+ ioEngine.write(sliceBuf, (offset + Long.BYTES));
} else {
- ioEngine.write(metaBuff, offset + len - metaBuff.limit());
+ ioEngine.write(sliceBuf, offset);
}
+ ioEngine.write(metaBuff, offset + len - metaBuff.limit());
} else {
// Only used for testing.
ByteBuffer bb = ByteBuffer.allocate(len);
@@ -1917,11 +1961,15 @@ public class BucketCache implements BlockCache,
HeapSize {
return backingMap;
}
+ public AtomicBoolean getBackingMapValidated() {
+ return backingMapValidated;
+ }
+
public Map<String, Boolean> getFullyCachedFiles() {
return fullyCachedFiles;
}
- public static Optional<BucketCache>
getBuckedCacheFromCacheConfig(CacheConfig cacheConf) {
+ public static Optional<BucketCache>
getBucketCacheFromCacheConfig(CacheConfig cacheConf) {
if (cacheConf.getBlockCache().isPresent()) {
BlockCache bc = cacheConf.getBlockCache().get();
if (bc instanceof CombinedBlockCache) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
index dbea4f3f325..e4382d2561e 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile.bucket;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +29,8 @@ public class BucketCachePersister extends Thread {
private final long intervalMillis;
private static final Logger LOG =
LoggerFactory.getLogger(BucketCachePersister.class);
+ private AtomicBoolean shutdown = new AtomicBoolean(false);
+
public BucketCachePersister(BucketCache cache, long intervalMillis) {
super("bucket-cache-persister");
this.cache = cache;
@@ -36,20 +39,34 @@ public class BucketCachePersister extends Thread {
}
public void run() {
- while (true) {
- try {
- Thread.sleep(intervalMillis);
- if (cache.isCacheInconsistent()) {
- LOG.debug("Cache is inconsistent, persisting to disk");
- cache.persistToFile();
- cache.setCacheInconsistent(false);
+ try {
+ while (true) {
+ try {
+ Thread.sleep(intervalMillis);
+ if (cache.isCacheInconsistent()) {
+ LOG.debug("Cache is inconsistent, persisting to disk");
+ cache.persistToFile();
+ cache.setCacheInconsistent(false);
+ }
+ // Thread.interrupt may cause an InterruptException inside util
method used for checksum
+ // calculation in persistToFile. This util currently swallows the
exception, causing this
+ // thread to net get interrupt, so we added this flag to indicate
the persister thread
+ // should stop.
+ if (shutdown.get()) {
+ break;
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception in BucketCachePersister.", e);
}
- } catch (IOException e) {
- LOG.warn("IOException in BucketCachePersister {} ", e.getMessage());
- } catch (InterruptedException iex) {
- LOG.warn("InterruptedException in BucketCachePersister {} ",
iex.getMessage());
- break;
}
+ LOG.info("Finishing cache persister thread.");
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupting BucketCachePersister thread.", e);
}
}
+
+ public void shutdown() {
+ this.shutdown.set(true);
+ this.interrupt();
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 38f9db04b6d..ba431c4c6dc 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -149,15 +149,14 @@ public class FileIOEngine extends PersistentIOEngine {
}
}
if (maintainPersistence) {
- dstBuff.position(length - Long.BYTES);
+ dstBuff.rewind();
long cachedNanoTime = dstBuff.getLong();
if (be.getCachedTime() != cachedNanoTime) {
dstBuff.release();
- throw new HBaseIOException("The cached time recorded within the cached
block differs "
- + "from its bucket entry, so it might not be the same.");
+ throw new HBaseIOException("The cached time recorded within the cached
block: "
+ + cachedNanoTime + " differs from its bucket entry: " +
be.getCachedTime());
}
- dstBuff.rewind();
- dstBuff.limit(length - Long.BYTES);
+ dstBuff.limit(length);
dstBuff = dstBuff.slice();
} else {
dstBuff.rewind();
@@ -167,10 +166,9 @@ public class FileIOEngine extends PersistentIOEngine {
void checkCacheTime(BucketEntry be) throws IOException {
long offset = be.offset();
- int length = be.getLength();
ByteBuff dstBuff = be.allocator.allocate(Long.BYTES);
try {
- accessFile(readAccessor, dstBuff, (offset + length - Long.BYTES));
+ accessFile(readAccessor, dstBuff, offset);
} catch (IOException ioe) {
dstBuff.release();
throw ioe;
@@ -179,8 +177,8 @@ public class FileIOEngine extends PersistentIOEngine {
long cachedNanoTime = dstBuff.getLong();
if (be.getCachedTime() != cachedNanoTime) {
dstBuff.release();
- throw new HBaseIOException("The cached time recorded within the cached
block differs "
- + "from its bucket entry, so it might not be the same.");
+ throw new HBaseIOException("The cached time recorded within the cached
block: "
+ + cachedNanoTime + " differs from its bucket entry: " +
be.getCachedTime());
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
index e4330308243..e5c6b42fcc4 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
@@ -106,7 +106,7 @@ public class TestPrefetchWithBucketCache {
// Prefetches the file blocks
LOG.debug("First read should prefetch the blocks.");
readStoreFile(storeFile);
- BucketCache bc =
BucketCache.getBuckedCacheFromCacheConfig(cacheConf).get();
+ BucketCache bc =
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
// Our file should have 6 DATA blocks. We should wait for all of them to
be cached
Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
Map<BlockCacheKey, BucketEntry> snapshot =
ImmutableMap.copyOf(bc.getBackingMap());
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
new file mode 100644
index 00000000000..ad91d01f8cf
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static
org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
+import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Basic test for check file's integrity before start BucketCache in
fileIOEngine
+ */
+@Category(SmallTests.class)
+public class TestRecoveryPersistentBucketCache {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRecoveryPersistentBucketCache.class);
+
+ final long capacitySize = 32 * 1024 * 1024;
+ final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
+ final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
+
+ @Test
+ public void testBucketCacheRecovery() throws Exception {
+ HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+ Path testDir = TEST_UTIL.getDataTestDir();
+ TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+ Configuration conf = HBaseConfiguration.create();
+ // Disables the persister thread by setting its interval to MAX_VALUE
+ conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
+ int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
+ BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
+ 8192, bucketSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence",
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+
+ CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(8192, 4);
+
+ CacheTestUtils.HFileBlockPair[] smallerBlocks =
CacheTestUtils.generateHFileBlocks(4096, 1);
+ // Add four blocks
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(),
blocks[0].getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(),
blocks[1].getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(),
blocks[2].getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(),
blocks[3].getBlock());
+ // saves the current state of the cache
+ bucketCache.persistToFile();
+ // evicts the 4th block
+ bucketCache.evictBlock(blocks[3].getBlockName());
+ // now adds a 5th block to bucket cache. This block is half the size of
the previous
+ // blocks, and it will be added in the same offset of the previous evicted
block.
+ // This overwrites part of the 4th block. Because we persisted only up to
the
+ // 4th block addition, recovery would try to read the whole 4th block, but
the cached time
+ // validation will fail, and we'll recover only the first three blocks
+ cacheAndWaitUntilFlushedToBucket(bucketCache,
smallerBlocks[0].getBlockName(),
+ smallerBlocks[0].getBlock());
+
+ // Creates new bucket cache instance without persisting to file after
evicting 4th block
+ // and caching 5th block. Here the cache file has the first three blocks,
followed by the
+ // 5th block and the second half of 4th block (we evicted 4th block,
freeing up its
+ // offset in the cache, then added 5th block which is half the size of
other blocks, so it's
+ // going to override the first half of the 4th block in the cache). That's
fine because
+ // the in-memory backing map has the right blocks and related offsets.
However, the
+ // persistent map file only has information about the first four blocks.
We validate the
+ // cache time recorded in the back map against the block data in the
cache. This is recorded
+ // in the cache as the first 8 bytes of a block, so the 4th block had its
first 8 blocks
+ // now overridden by the 5th block, causing this check to fail and removal
of
+ // the 4th block from the backing map.
+ BucketCache newBucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
+ 8192, bucketSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence",
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ Thread.sleep(100);
+ assertEquals(3, newBucketCache.backingMap.size());
+ assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false,
false));
+ assertNull(newBucketCache.getBlock(smallerBlocks[0].getBlockName(), false,
false, false));
+ assertEquals(blocks[0].getBlock(),
+ newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false));
+ assertEquals(blocks[1].getBlock(),
+ newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
+ assertEquals(blocks[2].getBlock(),
+ newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
+ TEST_UTIL.cleanupTestDir();
+ }
+
+ private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey
cacheKey)
+ throws InterruptedException {
+ while (!cache.backingMap.containsKey(cacheKey) ||
cache.ramCache.containsKey(cacheKey)) {
+ Thread.sleep(100);
+ }
+ }
+
+ // BucketCache.cacheBlock is async, it first adds block to ramCache and
writeQueue, then writer
+ // threads will flush it to the bucket and put reference entry in backingMap.
+ private void cacheAndWaitUntilFlushedToBucket(BucketCache cache,
BlockCacheKey cacheKey,
+ Cacheable block) throws InterruptedException {
+ cache.cacheBlock(cacheKey, block);
+ waitUntilFlushedToBucket(cache, cacheKey);
+ }
+
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
index 6fdea844aa3..0d33fb079bc 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -127,6 +127,7 @@ public class TestVerifyBucketCacheFile {
bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
+ Thread.sleep(100);
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
// Add blocks
@@ -146,6 +147,7 @@ public class TestVerifyBucketCacheFile {
bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
+ Thread.sleep(100);
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
@@ -201,9 +203,12 @@ public class TestVerifyBucketCacheFile {
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
- BucketCache bucketCache =
- new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
+ Configuration conf = HBaseConfiguration.create();
+ // Disables the persister thread by setting its interval to MAX_VALUE
+ conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
+ BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+ testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION,
conf);
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
@@ -228,6 +233,7 @@ public class TestVerifyBucketCacheFile {
bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
+ Thread.sleep(100);
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());