This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 94ed6add066 HBASE-28004 Persistent cache map can get corrupt if crash
happens midway through the write (#5341)
94ed6add066 is described below
commit 94ed6add066932b718176fc20a4553b835cf23cf
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Wed Aug 23 10:17:21 2023 +0100
HBASE-28004 Persistent cache map can get corrupt if crash happens midway
through the write (#5341)
Signed-off-by: Ankit Singhal <[email protected]>
Reviewed-by: Rahul Agarkar <[email protected]>
---
.../main/protobuf/server/io/BucketCacheEntry.proto | 3 +
.../apache/hadoop/hbase/io/hfile/CacheConfig.java | 2 -
.../hadoop/hbase/io/hfile/HFilePreadReader.java | 41 +++-
.../hadoop/hbase/io/hfile/PrefetchExecutor.java | 89 +--------
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 167 ++++++++++++----
.../hadoop/hbase/io/hfile/bucket/BucketEntry.java | 28 ++-
.../hbase/io/hfile/bucket/BucketProtoUtils.java | 10 +-
.../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 37 +++-
.../hfile/TestBlockEvictionOnRegionMovement.java | 1 -
.../hadoop/hbase/io/hfile/TestPrefetchRSClose.java | 4 -
.../io/hfile/TestPrefetchWithBucketCache.java | 211 +++++++++++++++++++++
.../hbase/io/hfile/bucket/TestBucketCache.java | 84 +++++---
.../io/hfile/bucket/TestBucketCachePersister.java | 9 +-
.../io/hfile/bucket/TestByteBufferIOEngine.java | 2 +-
.../io/hfile/bucket/TestPrefetchPersistence.java | 9 +-
.../hadoop/hbase/io/hfile/bucket/TestRAMCache.java | 2 +-
.../io/hfile/bucket/TestVerifyBucketCacheFile.java | 102 ++++++++--
17 files changed, 601 insertions(+), 200 deletions(-)
diff --git
a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto
b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto
index 038c6ca3f04..ae1980fe51e 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/io/BucketCacheEntry.proto
@@ -32,6 +32,7 @@ message BucketCacheEntry {
map<int32, string> deserializers = 4;
required BackingMap backing_map = 5;
optional bytes checksum = 6;
+ map<string, bool> prefetched_files = 7;
}
message BackingMap {
@@ -71,6 +72,8 @@ message BucketEntry {
required int64 access_counter = 3;
required int32 deserialiser_index = 4;
required BlockPriority priority = 5;
+ required int64 cachedTime = 6;
+ optional int32 disk_size_with_header = 7;
}
enum BlockPriority {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 15c64c03d5e..57f91fa19f4 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -93,8 +93,6 @@ public class CacheConfig {
public static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
"hbase.hfile.drop.behind.compaction";
- public static final String PREFETCH_PERSISTENCE_PATH_KEY =
"hbase.prefetch.file.list.path";
-
/**
* Configuration key to set interval for persisting bucket cache to disk.
*/
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 2c71ce9f484..f9c0ae59242 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -18,9 +18,13 @@
package org.apache.hadoop.hbase.io.hfile;
import java.io.IOException;
+import java.util.Optional;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,8 +39,14 @@ public class HFilePreadReader extends HFileReaderImpl {
public HFilePreadReader(ReaderContext context, HFileInfo fileInfo,
CacheConfig cacheConf,
Configuration conf) throws IOException {
super(context, fileInfo, cacheConf, conf);
+ final MutableBoolean fileAlreadyCached = new MutableBoolean(false);
+ BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent(bc ->
fileAlreadyCached
+ .setValue(bc.getFullyCachedFiles().get(path.getName()) == null ? false :
true));
// Prefetch file blocks upon open if requested
- if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()) {
+ if (
+ cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff()
+ && !fileAlreadyCached.booleanValue()
+ ) {
PrefetchExecutor.request(path, new Runnable() {
@Override
public void run() {
@@ -55,12 +65,36 @@ public class HFilePreadReader extends HFileReaderImpl {
if (LOG.isTraceEnabled()) {
LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset,
end));
}
+ Optional<BucketCache> bucketCacheOptional =
+ BucketCache.getBuckedCacheFromCacheConfig(cacheConf);
// Don't use BlockIterator here, because it's designed to read
load-on-open section.
long onDiskSizeOfNextBlock = -1;
while (offset < end) {
if (Thread.interrupted()) {
break;
}
+ // BucketCache can be persistent and resilient to restarts, so
we check first if the
+ // block exists on its in-memory index, if so, we just update
the offset and move on
+ // to the next block without actually going read all the way to
the cache.
+ if (bucketCacheOptional.isPresent()) {
+ BucketCache cache = bucketCacheOptional.get();
+ BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
+ BucketEntry entry = cache.getBackingMap().get(cacheKey);
+ if (entry != null) {
+ cacheKey = new BlockCacheKey(name, offset);
+ entry = cache.getBackingMap().get(cacheKey);
+ if (entry == null) {
+ LOG.debug("No cache key {}, we'll read and cache it",
cacheKey);
+ } else {
+ offset += entry.getOnDiskSizeWithHeader();
+ LOG.debug("Found cache key {}. Skipping prefetch, the
block is already cached.",
+ cacheKey);
+ continue;
+ }
+ } else {
+ LOG.debug("No entry in the backing map for cache key {}",
cacheKey);
+ }
+ }
// Perhaps we got our block from cache? Unlikely as this may be,
if it happens, then
// the internal-to-hfileblock thread local which holds the
overread that gets the
// next header, will not have happened...so, pass in the
onDiskSize gotten from the
@@ -77,12 +111,15 @@ public class HFilePreadReader extends HFileReaderImpl {
block.release();
}
}
+ BucketCache.getBuckedCacheFromCacheConfig(cacheConf)
+ .ifPresent(bc -> bc.fileCacheCompleted(path.getName()));
+
} catch (IOException e) {
// IOExceptions are probably due to region closes (relocation,
etc.)
if (LOG.isTraceEnabled()) {
LOG.trace("Prefetch " + getPathOffsetEndStr(path, offset, end),
e);
}
- } catch (Exception e) {
+ } catch (Throwable e) {
// Other exceptions are interesting
LOG.warn("Prefetch " + getPathOffsetEndStr(path, offset, end), e);
} finally {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
index d3064e066a1..02fbc12e85c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
@@ -17,11 +17,6 @@
*/
package org.apache.hadoop.hbase.io.hfile;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
@@ -42,8 +37,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.PersistentPrefetchProtos;
-
@InterfaceAudience.Private
public final class PrefetchExecutor {
@@ -51,16 +44,12 @@ public final class PrefetchExecutor {
/** Futures for tracking block prefetch activity */
private static final Map<Path, Future<?>> prefetchFutures = new
ConcurrentSkipListMap<>();
- /** Set of files for which prefetch is completed */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"MS_SHOULD_BE_FINAL")
- private static HashMap<String, Boolean> prefetchCompleted = new HashMap<>();
/** Executor pool shared among all HFiles for block prefetch */
private static final ScheduledExecutorService prefetchExecutorPool;
/** Delay before beginning prefetch */
private static final int prefetchDelayMillis;
/** Variation in prefetch delay times, to mitigate stampedes */
private static final float prefetchDelayVariation;
- static String prefetchedFileListPath;
static {
// Consider doing this on demand with a configuration passed in rather
// than in a static initializer.
@@ -90,13 +79,6 @@ public final class PrefetchExecutor {
+ HConstants.HREGION_COMPACTIONDIR_NAME.replace(".", "\\.") +
Path.SEPARATOR_CHAR + ")");
public static void request(Path path, Runnable runnable) {
- if (prefetchCompleted != null) {
- if (isFilePrefetched(path.getName())) {
- LOG.info(
- "File has already been prefetched before the restart, so skipping
prefetch : " + path);
- return;
- }
- }
if (!prefetchPathExclude.matcher(path.toString()).find()) {
long delay;
if (prefetchDelayMillis > 0) {
@@ -122,8 +104,9 @@ public final class PrefetchExecutor {
public static void complete(Path path) {
prefetchFutures.remove(path);
- prefetchCompleted.put(path.getName(), true);
- LOG.debug("Prefetch completed for {}", path.getName());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Prefetch completed for {}", path.getName());
+ }
}
public static void cancel(Path path) {
@@ -134,8 +117,6 @@ public final class PrefetchExecutor {
prefetchFutures.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
- LOG.debug("Removing filename from the prefetched persistence list: {}",
path.getName());
- removePrefetchedFileWhileEvict(path.getName());
}
public static boolean isCompleted(Path path) {
@@ -146,70 +127,6 @@ public final class PrefetchExecutor {
return true;
}
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"OBL_UNSATISFIED_OBLIGATION",
- justification = "false positive, try-with-resources ensures close is
called.")
- public static void persistToFile(String path) throws IOException {
- prefetchedFileListPath = path;
- if (prefetchedFileListPath == null) {
- LOG.info("Exception while persisting prefetch!");
- throw new IOException("Error persisting prefetched HFiles set!");
- }
- if (!prefetchCompleted.isEmpty()) {
- try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath,
false)) {
- PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
- }
- }
- }
-
- public static void retrieveFromFile(String path) throws IOException {
- prefetchedFileListPath = path;
- File prefetchPersistenceFile = new File(prefetchedFileListPath);
- if (!prefetchPersistenceFile.exists()) {
- LOG.warn("Prefetch persistence file does not exist!");
- return;
- }
- LOG.info("Retrieving from prefetch persistence file " + path);
- assert (prefetchedFileListPath != null);
- try (FileInputStream fis = deleteFileOnClose(prefetchPersistenceFile)) {
- PersistentPrefetchProtos.PrefetchedHfileName proto =
- PersistentPrefetchProtos.PrefetchedHfileName.parseDelimitedFrom(fis);
- Map<String, Boolean> protoPrefetchedFilesMap =
proto.getPrefetchedFilesMap();
- prefetchCompleted.putAll(protoPrefetchedFilesMap);
- }
- }
-
- private static FileInputStream deleteFileOnClose(final File file) throws
IOException {
- return new FileInputStream(file) {
- private File myFile;
-
- private FileInputStream init(File file) {
- myFile = file;
- return this;
- }
-
- @Override
- public void close() throws IOException {
- if (myFile == null) {
- return;
- }
-
- super.close();
- if (!myFile.delete()) {
- throw new IOException("Failed deleting persistence file " +
myFile.getAbsolutePath());
- }
- myFile = null;
- }
- }.init(file);
- }
-
- public static void removePrefetchedFileWhileEvict(String hfileName) {
- prefetchCompleted.remove(hfileName);
- }
-
- public static boolean isFilePrefetched(String hfileName) {
- return prefetchCompleted.containsKey(hfileName);
- }
-
private PrefetchExecutor() {
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 14c4c44ee16..bc5e7e7c9b9 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.io.hfile.bucket;
import static
org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
-import static
org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
import java.io.File;
import java.io.FileInputStream;
@@ -32,6 +31,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -52,6 +52,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
@@ -62,12 +63,13 @@ import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.BlockCacheUtil;
import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
+import org.apache.hadoop.hbase.io.hfile.CombinedBlockCache;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
@@ -143,8 +145,14 @@ public class BucketCache implements BlockCache, HeapSize {
// Store the block in this map before writing it to cache
transient final RAMCache ramCache;
+
// In this map, store the block's meta data like offset, length
- transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap;
+ transient Map<BlockCacheKey, BucketEntry> backingMap;
+
+ /** Set of files for which prefetch is completed */
+ final Map<String, Boolean> fullyCachedFiles = new ConcurrentHashMap<>();
+
+ private BucketCachePersister cachePersister;
/**
* Flag if the cache is enabled or not... We shut it off if there are IO
errors for some time, so
@@ -177,9 +185,6 @@ public class BucketCache implements BlockCache, HeapSize {
private static final int DEFAULT_CACHE_WAIT_TIME = 50;
private final BucketCacheStats cacheStats = new BucketCacheStats();
-
- /** BucketCache persister thread */
- private BucketCachePersister cachePersister;
private final String persistencePath;
static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
private final long cacheCapacity;
@@ -239,8 +244,6 @@ public class BucketCache implements BlockCache, HeapSize {
/** In-memory bucket size */
private float memoryFactor;
- private String prefetchedFileListPath;
-
private long bucketcachePersistInterval;
private static final String FILE_VERIFY_ALGORITHM =
@@ -293,7 +296,6 @@ public class BucketCache implements BlockCache, HeapSize {
this.memoryFactor = conf.getFloat(MEMORY_FACTOR_CONFIG_NAME,
DEFAULT_MEMORY_FACTOR);
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
- this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
this.bucketcachePersistInterval =
conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
sanityCheckConfigs();
@@ -320,11 +322,15 @@ public class BucketCache implements BlockCache, HeapSize {
this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
- if (ioEngine.isPersistent() && persistencePath != null) {
- startBucketCachePersisterThread();
+ if (isCachePersistent()) {
+ if (ioEngine instanceof FileIOEngine) {
+ startBucketCachePersisterThread();
+ }
try {
retrieveFromFile(bucketSizes);
} catch (IOException ioex) {
+ backingMap.clear();
+ fullyCachedFiles.clear();
LOG.error("Can't restore from file[" + persistencePath + "] because of
", ioex);
}
}
@@ -429,7 +435,7 @@ public class BucketCache implements BlockCache, HeapSize {
}
public boolean isCachePersistenceEnabled() {
- return (prefetchedFileListPath != null) && (persistencePath != null);
+ return persistencePath != null;
}
/**
@@ -504,8 +510,8 @@ public class BucketCache implements BlockCache, HeapSize {
}
LOG.trace("Caching key={}, item={}", cacheKey, cachedItem);
// Stuff the entry into the RAM cache so it can get drained to the
persistent store
- RAMQueueEntry re =
- new RAMQueueEntry(cacheKey, cachedItem, accessCount.incrementAndGet(),
inMemory);
+ RAMQueueEntry re = new RAMQueueEntry(cacheKey, cachedItem,
accessCount.incrementAndGet(),
+ inMemory, isCachePersistent() && ioEngine instanceof FileIOEngine);
/**
* Don't use ramCache.put(cacheKey, re) here. because there may be a
existing entry with same
* key in ramCache, the heap size of bucket cache need to update if
replacing entry from
@@ -589,6 +595,12 @@ public class BucketCache implements BlockCache, HeapSize {
}
return cachedBlock;
}
+ } catch (HBaseIOException hioex) {
+ // When using file io engine persistent cache,
+ // the cache map state might differ from the actual cache. If we reach
this block,
+ // we should remove the cache key entry from the backing map
+ backingMap.remove(key);
+ LOG.debug("Failed to fetch block for cache key: {}.", key, hioex);
} catch (IOException ioex) {
LOG.error("Failed reading block " + key + " from bucket cache", ioex);
checkIOErrorIsTolerated();
@@ -616,13 +628,15 @@ public class BucketCache implements BlockCache, HeapSize {
cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
}
if (ioEngine.isPersistent()) {
- if (prefetchedFileListPath != null) {
-
PrefetchExecutor.removePrefetchedFileWhileEvict(cacheKey.getHfileName());
- }
+ fullyCachedFiles.remove(cacheKey.getHfileName());
setCacheInconsistent(true);
}
}
+ public void fileCacheCompleted(String fileName) {
+ fullyCachedFiles.put(fileName, true);
+ }
+
/**
* Free the {{@link BucketEntry} actually,which could only be invoked when
the
* {@link BucketEntry#refCnt} becoming 0.
@@ -1252,18 +1266,24 @@ public class BucketCache implements BlockCache,
HeapSize {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value =
"OBL_UNSATISFIED_OBLIGATION",
justification = "false positive, try-with-resources ensures close is
called.")
void persistToFile() throws IOException {
- if (!ioEngine.isPersistent()) {
+ if (!isCachePersistent()) {
throw new IOException("Attempt to persist non-persistent cache
mappings!");
}
- try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) {
+ File tempPersistencePath = new File(persistencePath +
EnvironmentEdgeManager.currentTime());
+ try (FileOutputStream fos = new FileOutputStream(tempPersistencePath,
false)) {
fos.write(ProtobufMagic.PB_MAGIC);
BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
}
- if (prefetchedFileListPath != null) {
- PrefetchExecutor.persistToFile(prefetchedFileListPath);
+ if (!tempPersistencePath.renameTo(new File(persistencePath))) {
+ LOG.warn("Failed to commit cache persistent file. We might lose cached
blocks if "
+ + "RS crashes/restarts before we successfully checkpoint again.");
}
}
+ private boolean isCachePersistent() {
+ return ioEngine.isPersistent() && persistencePath != null;
+ }
+
/**
* @see #persistToFile()
*/
@@ -1273,9 +1293,6 @@ public class BucketCache implements BlockCache, HeapSize {
return;
}
assert !cacheEnabled;
- if (prefetchedFileListPath != null) {
- PrefetchExecutor.retrieveFromFile(prefetchedFileListPath);
- }
try (FileInputStream in = deleteFileOnClose(persistenceFile)) {
int pblen = ProtobufMagic.lengthOfPBMagic();
@@ -1358,16 +1375,37 @@ public class BucketCache implements BlockCache,
HeapSize {
}
private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws
IOException {
+ backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(),
proto.getBackingMap(),
+ this::createRecycler);
+ fullyCachedFiles.clear();
+ fullyCachedFiles.putAll(proto.getPrefetchedFilesMap());
if (proto.hasChecksum()) {
- ((PersistentIOEngine)
ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
- algorithm);
+ try {
+ ((PersistentIOEngine)
ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
+ algorithm);
+ } catch (IOException e) {
+ LOG.warn("Checksum for cache file failed. "
+ + "We need to validate each cache key in the backing map. This may
take some time...");
+ long startTime = EnvironmentEdgeManager.currentTime();
+ int totalKeysOriginally = backingMap.size();
+ for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry :
backingMap.entrySet()) {
+ try {
+ ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
+ } catch (IOException e1) {
+ LOG.debug("Check for key {} failed. Removing it from map.",
keyEntry.getKey());
+ backingMap.remove(keyEntry.getKey());
+ fullyCachedFiles.remove(keyEntry.getKey().getHfileName());
+ }
+ }
+ LOG.info("Finished validating {} keys in the backing map. Recovered:
{}. This took {}ms.",
+ totalKeysOriginally, backingMap.size(),
+ (EnvironmentEdgeManager.currentTime() - startTime));
+ }
} else {
// if has not checksum, it means the persistence file is old format
LOG.info("Persistent file is old format, it does not support verifying
file integrity!");
}
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(),
proto.getMapClass());
- backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(),
proto.getBackingMap(),
- this::createRecycler);
}
/**
@@ -1403,6 +1441,7 @@ public class BucketCache implements BlockCache, HeapSize {
if (!ioEngine.isPersistent() || persistencePath == null) {
// If persistent ioengine and a path, we will serialize out the
backingMap.
this.backingMap.clear();
+ this.fullyCachedFiles.clear();
}
}
@@ -1417,7 +1456,9 @@ public class BucketCache implements BlockCache, HeapSize {
LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent()
+ "; path to write="
+ persistencePath);
if (ioEngine.isPersistent() && persistencePath != null) {
- cachePersister.interrupt();
+ if (cachePersister != null) {
+ cachePersister.interrupt();
+ }
try {
join();
persistToFile();
@@ -1429,6 +1470,17 @@ public class BucketCache implements BlockCache, HeapSize
{
}
}
+ /**
+ * Needed mostly for UTs that might run in the same VM and create different
BucketCache instances
+ * on different UT methods.
+ */
+ @Override
+ protected void finalize() {
+ if (cachePersister != null && !cachePersister.isInterrupted()) {
+ cachePersister.interrupt();
+ }
+ }
+
@Override
public CacheStats getStats() {
return cacheStats;
@@ -1485,7 +1537,7 @@ public class BucketCache implements BlockCache, HeapSize {
*/
@Override
public int evictBlocksByHfileName(String hfileName) {
- PrefetchExecutor.removePrefetchedFileWhileEvict(hfileName);
+ this.fullyCachedFiles.remove(hfileName);
Set<BlockCacheKey> keySet = blocksByHFile.subSet(new
BlockCacheKey(hfileName, Long.MIN_VALUE),
true, new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
@@ -1556,12 +1608,15 @@ public class BucketCache implements BlockCache,
HeapSize {
private final Cacheable data;
private long accessCounter;
private boolean inMemory;
+ private boolean isCachePersistent;
- RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
boolean inMemory) {
+ RAMQueueEntry(BlockCacheKey bck, Cacheable data, long accessCounter,
boolean inMemory,
+ boolean isCachePersistent) {
this.key = bck;
this.data = data;
this.accessCounter = accessCounter;
this.inMemory = inMemory;
+ this.isCachePersistent = isCachePersistent;
}
public Cacheable getData() {
@@ -1591,12 +1646,19 @@ public class BucketCache implements BlockCache,
HeapSize {
if (len == 0) {
return null;
}
+ if (isCachePersistent && data instanceof HFileBlock) {
+ len += Long.BYTES; // we need to record the cache time for consistency
check in case of
+ // recovery
+ }
long offset = alloc.allocateBlock(len);
boolean succ = false;
BucketEntry bucketEntry = null;
try {
- bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory,
createRecycler,
- getByteBuffAllocator());
+ int diskSizeWithHeader = (data instanceof HFileBlock)
+ ? ((HFileBlock) data).getOnDiskSizeWithHeader()
+ : data.getSerializedLength();
+ bucketEntry = new BucketEntry(offset, len, diskSizeWithHeader,
accessCounter, inMemory,
+ createRecycler, getByteBuffAllocator());
bucketEntry.setDeserializerReference(data.getDeserializer());
if (data instanceof HFileBlock) {
// If an instance of HFileBlock, save on some allocations.
@@ -1604,7 +1666,16 @@ public class BucketCache implements BlockCache, HeapSize
{
ByteBuff sliceBuf = block.getBufferReadOnly();
block.getMetaData(metaBuff);
ioEngine.write(sliceBuf, offset);
- ioEngine.write(metaBuff, offset + len - metaBuff.limit());
+ // adds the cache time after the block and metadata part
+ if (isCachePersistent) {
+ ioEngine.write(metaBuff, offset + len - metaBuff.limit() -
Long.BYTES);
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong(bucketEntry.getCachedTime());
+ buffer.rewind();
+ ioEngine.write(buffer, (offset + len - Long.BYTES));
+ } else {
+ ioEngine.write(metaBuff, offset + len - metaBuff.limit());
+ }
} else {
// Only used for testing.
ByteBuffer bb = ByteBuffer.allocate(len);
@@ -1760,6 +1831,10 @@ public class BucketCache implements BlockCache, HeapSize
{
return memoryFactor;
}
+ public String getPersistencePath() {
+ return persistencePath;
+ }
+
/**
* Wrapped the delegate ConcurrentMap with maintaining its block's reference
count.
*/
@@ -1837,4 +1912,28 @@ public class BucketCache implements BlockCache, HeapSize
{
}
}
}
+
+ public Map<BlockCacheKey, BucketEntry> getBackingMap() {
+ return backingMap;
+ }
+
+ public Map<String, Boolean> getFullyCachedFiles() {
+ return fullyCachedFiles;
+ }
+
+ public static Optional<BucketCache>
getBuckedCacheFromCacheConfig(CacheConfig cacheConf) {
+ if (cacheConf.getBlockCache().isPresent()) {
+ BlockCache bc = cacheConf.getBlockCache().get();
+ if (bc instanceof CombinedBlockCache) {
+ BlockCache l2 = ((CombinedBlockCache) bc).getSecondLevelCache();
+ if (l2 instanceof BucketCache) {
+ return Optional.of((BucketCache) l2);
+ }
+ } else if (bc instanceof BucketCache) {
+ return Optional.of((BucketCache) bc);
+ }
+ }
+ return Optional.empty();
+ }
+
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
index a04a32bfe64..c93dac8a572 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketEntry.java
@@ -43,13 +43,15 @@ import org.apache.yetus.audience.InterfaceAudience;
* bytes gives us 256TB or so.
*/
@InterfaceAudience.Private
-class BucketEntry implements HBaseReferenceCounted {
+public class BucketEntry implements HBaseReferenceCounted {
// access counter comparator, descending order
static final Comparator<BucketEntry> COMPARATOR =
Comparator.comparingLong(BucketEntry::getAccessCounter).reversed();
private int offsetBase;
private int length;
+
+ private int onDiskSizeWithHeader;
private byte offset1;
/**
@@ -91,24 +93,32 @@ class BucketEntry implements HBaseReferenceCounted {
/**
* Time this block was cached. Presumes we are created just before we are
added to the cache.
*/
- private final long cachedTime = System.nanoTime();
+ private long cachedTime = System.nanoTime();
/**
* @param createRecycler used to free this {@link BucketEntry} when {@link
BucketEntry#refCnt}
* becoming 0. NOTICE that {@link
ByteBuffAllocator#NONE} could only be used
* for test.
*/
- BucketEntry(long offset, int length, long accessCounter, boolean inMemory,
- Function<BucketEntry, Recycler> createRecycler, ByteBuffAllocator
allocator) {
+ BucketEntry(long offset, int length, int onDiskSizeWithHeader, long
accessCounter,
+ boolean inMemory, Function<BucketEntry, Recycler> createRecycler,
ByteBuffAllocator allocator) {
+ this(offset, length, onDiskSizeWithHeader, accessCounter,
System.nanoTime(), inMemory,
+ createRecycler, allocator);
+ }
+
+ BucketEntry(long offset, int length, int onDiskSizeWithHeader, long
accessCounter,
+ long cachedTime, boolean inMemory, Function<BucketEntry, Recycler>
createRecycler,
+ ByteBuffAllocator allocator) {
if (createRecycler == null) {
throw new IllegalArgumentException("createRecycler could not be null!");
}
setOffset(offset);
this.length = length;
+ this.onDiskSizeWithHeader = onDiskSizeWithHeader;
this.accessCounter = accessCounter;
+ this.cachedTime = cachedTime;
this.priority = inMemory ? BlockPriority.MEMORY : BlockPriority.MULTI;
this.refCnt = RefCnt.create(createRecycler.apply(this));
-
this.markedAsEvicted = new AtomicBoolean(false);
this.allocator = allocator;
}
@@ -159,10 +169,14 @@ class BucketEntry implements HBaseReferenceCounted {
return this.priority;
}
- long getCachedTime() {
+ public long getCachedTime() {
return cachedTime;
}
+ public int getOnDiskSizeWithHeader() {
+ return onDiskSizeWithHeader;
+ }
+
/**
* The {@link BucketCache} will try to release its reference to this
BucketEntry many times. we
* must make sure the idempotent, otherwise it'll decrease the RPC's
reference count in advance,
@@ -239,7 +253,7 @@ class BucketEntry implements HBaseReferenceCounted {
* also release its refCnt (case.1 will do this) and no other rpc reference,
then it will free the
* area in bucketAllocator. <br>
* 3.evict those block without any rpc reference if cache size exceeded.
we'll only free those
- * blocks with zero rpc reference count, as the {@link
BucketEntry#markStaleAsEvicted()} do.
+ * blocks with zero rpc reference count.
* @return true to indicate we've decreased to zero and do the de-allocation.
*/
@Override
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
index ff4e90b8865..8830e5d3255 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
@@ -45,6 +45,7 @@ final class BucketProtoUtils {
.setIoClass(cache.ioEngine.getClass().getName())
.setMapClass(cache.backingMap.getClass().getName())
.putAllDeserializers(CacheableDeserializerIdManager.save())
+ .putAllPrefetchedFiles(cache.fullyCachedFiles)
.setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
.setChecksum(ByteString
.copyFrom(((PersistentIOEngine)
cache.ioEngine).calculateChecksum(cache.getAlgorithm())))
@@ -99,8 +100,10 @@ final class BucketProtoUtils {
private static BucketCacheProtos.BucketEntry toPB(BucketEntry entry) {
return BucketCacheProtos.BucketEntry.newBuilder().setOffset(entry.offset())
-
.setLength(entry.getLength()).setDeserialiserIndex(entry.deserializerIndex)
-
.setAccessCounter(entry.getAccessCounter()).setPriority(toPB(entry.getPriority())).build();
+ .setCachedTime(entry.getCachedTime()).setLength(entry.getLength())
+ .setDiskSizeWithHeader(entry.getOnDiskSizeWithHeader())
+
.setDeserialiserIndex(entry.deserializerIndex).setAccessCounter(entry.getAccessCounter())
+ .setPriority(toPB(entry.getPriority())).build();
}
private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) {
@@ -128,7 +131,8 @@ final class BucketProtoUtils {
// TODO:We use ByteBuffAllocator.HEAP here, because we could not get the
ByteBuffAllocator
// which created by RpcServer elegantly.
BucketEntry value = new BucketEntry(protoValue.getOffset(),
protoValue.getLength(),
- protoValue.getAccessCounter(),
+ protoValue.getDiskSizeWithHeader(), protoValue.getAccessCounter(),
+ protoValue.getCachedTime(),
protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory,
createRecycler,
ByteBuffAllocator.HEAP);
// This is the deserializer that we stored
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 370343b1b25..38f9db04b6d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -26,6 +26,7 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.nio.ByteBuff;
@@ -49,6 +50,7 @@ public class FileIOEngine extends PersistentIOEngine {
private final long sizePerFile;
private final long capacity;
+ private boolean maintainPersistence;
private FileReadAccessor readAccessor = new FileReadAccessor();
private FileWriteAccessor writeAccessor = new FileWriteAccessor();
@@ -59,6 +61,7 @@ public class FileIOEngine extends PersistentIOEngine {
this.sizePerFile = capacity / filePaths.length;
this.capacity = this.sizePerFile * filePaths.length;
this.fileChannels = new FileChannel[filePaths.length];
+ this.maintainPersistence = maintainPersistence;
if (!maintainPersistence) {
for (String filePath : filePaths) {
File file = new File(filePath);
@@ -145,10 +148,42 @@ public class FileIOEngine extends PersistentIOEngine {
throw ioe;
}
}
- dstBuff.rewind();
+ if (maintainPersistence) {
+ dstBuff.position(length - Long.BYTES);
+ long cachedNanoTime = dstBuff.getLong();
+ if (be.getCachedTime() != cachedNanoTime) {
+ dstBuff.release();
+ throw new HBaseIOException("The cached time recorded within the cached
block differs "
+ + "from its bucket entry, so it might not be the same.");
+ }
+ dstBuff.rewind();
+ dstBuff.limit(length - Long.BYTES);
+ dstBuff = dstBuff.slice();
+ } else {
+ dstBuff.rewind();
+ }
return be.wrapAsCacheable(dstBuff);
}
+ void checkCacheTime(BucketEntry be) throws IOException {
+ long offset = be.offset();
+ int length = be.getLength();
+ ByteBuff dstBuff = be.allocator.allocate(Long.BYTES);
+ try {
+ accessFile(readAccessor, dstBuff, (offset + length - Long.BYTES));
+ } catch (IOException ioe) {
+ dstBuff.release();
+ throw ioe;
+ }
+ dstBuff.rewind();
+ long cachedNanoTime = dstBuff.getLong();
+ if (be.getCachedTime() != cachedNanoTime) {
+ dstBuff.release();
+ throw new HBaseIOException("The cached time recorded within the cached
block differs "
+ + "from its bucket entry, so it might not be the same.");
+ }
+ }
+
void closeFileChannels() {
for (FileChannel fileChannel : fileChannels) {
try {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
index 66b2ca73ded..eb3e3cc61f4 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockEvictionOnRegionMovement.java
@@ -79,7 +79,6 @@ public class TestBlockEvictionOnRegionMovement {
conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache");
conf.setInt("hbase.bucketcache.size", 400);
conf.set("hbase.bucketcache.persistent.path", testDir +
"/bucket.persistence");
- conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir +
"/prefetch.persistence");
conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 100);
conf.setBoolean(CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY, true);
zkCluster = TEST_UTIL.startMiniZKCluster();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
index b10186996ed..64db9158333 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
@@ -75,7 +75,6 @@ public class TestPrefetchRSClose {
conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:" + testDir + "/bucket.cache");
conf.setInt("hbase.bucketcache.size", 400);
conf.set("hbase.bucketcache.persistent.path", testDir +
"/bucket.persistence");
- conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, testDir +
"/prefetch.persistence");
zkCluster = TEST_UTIL.startMiniZKCluster();
cluster = TEST_UTIL.startMiniHBaseCluster(option);
assertEquals(2, cluster.getRegionServerThreads().size());
@@ -114,18 +113,15 @@ public class TestPrefetchRSClose {
// Default interval for cache persistence is 1000ms. So after 1000ms, both
the persistence files
// should exist.
assertTrue(new File(testDir + "/bucket.persistence").exists());
- assertTrue(new File(testDir + "/prefetch.persistence").exists());
// Stop the RS
cluster.stopRegionServer(0);
LOG.info("Stopped Region Server 0.");
Thread.sleep(1000);
assertTrue(new File(testDir + "/bucket.persistence").exists());
- assertTrue(new File(testDir + "/prefetch.persistence").exists());
// Start the RS and validate
cluster.startRegionServer();
- assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertFalse(new File(testDir + "/bucket.persistence").exists());
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
new file mode 100644
index 00000000000..e4330308243
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ IOTests.class, MediumTests.class })
+public class TestPrefetchWithBucketCache {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestPrefetchWithBucketCache.class);
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestPrefetchWithBucketCache.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+ private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length
- 2;
+ private static final int DATA_BLOCK_SIZE = 2048;
+ private static final int NUM_KV = 100;
+
+ private Configuration conf;
+ private CacheConfig cacheConf;
+ private FileSystem fs;
+ private BlockCache blockCache;
+
+ @Before
+ public void setUp() throws IOException {
+ conf = TEST_UTIL.getConfiguration();
+ conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
+ fs = HFileSystem.get(conf);
+ File testDir = new File(name.getMethodName());
+ testDir.mkdir();
+ conf.set(BUCKET_CACHE_IOENGINE_KEY, "file:/" + testDir.getAbsolutePath() +
"/bucket.cache");
+ conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
+ blockCache = BlockCacheFactory.createBlockCache(conf);
+ cacheConf = new CacheConfig(conf, blockCache);
+ }
+
+ @After
+ public void tearDown() {
+ File cacheFile = new File(name.getMethodName() + "/bucket.cache");
+ File dir = new File(name.getMethodName());
+ cacheFile.delete();
+ dir.delete();
+ }
+
+ @Test
+ public void testPrefetchDoesntOverwork() throws Exception {
+ Path storeFile = writeStoreFile("TestPrefetchDoesntOverwork");
+ // Prefetches the file blocks
+ LOG.debug("First read should prefetch the blocks.");
+ readStoreFile(storeFile);
+ BucketCache bc =
BucketCache.getBuckedCacheFromCacheConfig(cacheConf).get();
+ // Our file should have 6 DATA blocks. We should wait for all of them to
be cached
+ Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
+ Map<BlockCacheKey, BucketEntry> snapshot =
ImmutableMap.copyOf(bc.getBackingMap());
+ // Reads file again and check we are not prefetching it again
+ LOG.debug("Second read, no prefetch should happen here.");
+ readStoreFile(storeFile);
+ // Makes sure the cache hasn't changed
+ snapshot.entrySet().forEach(e -> {
+ BucketEntry entry = bc.getBackingMap().get(e.getKey());
+ assertNotNull(entry);
+ assertEquals(e.getValue().getCachedTime(), entry.getCachedTime());
+ });
+ // forcibly removes first block from the bc backing map, in order to cause
it to be cached again
+ BlockCacheKey key = snapshot.keySet().stream().findFirst().get();
+ LOG.debug("removing block {}", key);
+ bc.getBackingMap().remove(key);
+ bc.getFullyCachedFiles().remove(storeFile.getName());
+ assertTrue(snapshot.size() > bc.getBackingMap().size());
+ LOG.debug("Third read should prefetch again, as we removed one block for
the file.");
+ readStoreFile(storeFile);
+ Waiter.waitFor(conf, 300, () -> snapshot.size() ==
bc.getBackingMap().size());
+ assertTrue(snapshot.get(key).getCachedTime() <
bc.getBackingMap().get(key).getCachedTime());
+ }
+
+ private void readStoreFile(Path storeFilePath) throws Exception {
+ readStoreFile(storeFilePath, (r, o) -> {
+ HFileBlock block = null;
+ try {
+ block = r.readBlock(o, -1, false, true, false, true, null, null);
+ } catch (IOException e) {
+ fail(e.getMessage());
+ }
+ return block;
+ }, (key, block) -> {
+ boolean isCached = blockCache.getBlock(key, true, false, true) != null;
+ if (
+ block.getBlockType() == BlockType.DATA || block.getBlockType() ==
BlockType.ROOT_INDEX
+ || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
+ ) {
+ assertTrue(isCached);
+ }
+ });
+ }
+
+ private void readStoreFile(Path storeFilePath,
+ BiFunction<HFile.Reader, Long, HFileBlock> readFunction,
+ BiConsumer<BlockCacheKey, HFileBlock> validationFunction) throws Exception
{
+ // Open the file
+ HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf,
true, conf);
+
+ while (!reader.prefetchComplete()) {
+ // Sleep for a bit
+ Thread.sleep(1000);
+ }
+ long offset = 0;
+ while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+ HFileBlock block = readFunction.apply(reader, offset);
+ BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(),
offset);
+ validationFunction.accept(blockCacheKey, block);
+ offset += block.getOnDiskSizeWithHeader();
+ }
+ }
+
+ private Path writeStoreFile(String fname) throws IOException {
+ HFileContext meta = new
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
+ return writeStoreFile(fname, meta);
+ }
+
+ private Path writeStoreFile(String fname, HFileContext context) throws
IOException {
+ Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
+ StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
+ .withOutputDir(storeFileParentDir).withFileContext(context).build();
+ Random rand = ThreadLocalRandom.current();
+ final int rowLen = 32;
+ for (int i = 0; i < NUM_KV; ++i) {
+ byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
+ byte[] v = RandomKeyValueUtil.randomValue(rand);
+ int cfLen = rand.nextInt(k.length - rowLen + 1);
+ KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen +
cfLen,
+ k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v,
0, v.length);
+ sfw.append(kv);
+ }
+
+ sfw.close();
+ return sfw.getPath();
+ }
+
+ public static KeyValue.Type generateKeyType(Random rand) {
+ if (rand.nextBoolean()) {
+ // Let's make half of KVs puts.
+ return KeyValue.Type.Put;
+ } else {
+ KeyValue.Type keyType = KeyValue.Type.values()[1 +
rand.nextInt(NUM_VALID_KEY_TYPES)];
+ if (keyType == KeyValue.Type.Minimum || keyType ==
KeyValue.Type.Maximum) {
+ throw new RuntimeException("Generated an invalid key type: " + keyType
+ ". "
+ + "Probably the layout of KeyValue.Type has changed.");
+ }
+ return keyType;
+ }
+ }
+
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index ad381a665c3..0cbafedc7c5 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -57,6 +57,7 @@ import
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
@@ -298,12 +299,17 @@ public class TestBucketCache {
testRetrievalUtils(testDir, ioEngineName);
int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
String persistencePath = testDir + "/bucket.persistence";
- BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
- smallBucketSizes, writeThreads, writerQLen, persistencePath);
- assertFalse(new File(persistencePath).exists());
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
- HBASE_TESTING_UTILITY.cleanupTestDir();
+ BucketCache bucketCache = null;
+ try {
+ bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
+ smallBucketSizes, writeThreads, writerQLen, persistencePath);
+ assertFalse(new File(persistencePath).exists());
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+ } finally {
+ bucketCache.shutdown();
+ HBASE_TESTING_UTILITY.cleanupTestDir();
+ }
}
@Test
@@ -319,21 +325,28 @@ public class TestBucketCache {
final String ioEngineName = "pmem:" + testDir + "/bucket.cache";
testRetrievalUtils(testDir, ioEngineName);
int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
- String persistencePath = testDir + "/bucket.persistence";
- BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
- smallBucketSizes, writeThreads, writerQLen, persistencePath);
- assertFalse(new File(persistencePath).exists());
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
- HBASE_TESTING_UTILITY.cleanupTestDir();
+ String persistencePath = testDir + "/bucket.persistence" +
EnvironmentEdgeManager.currentTime();
+ BucketCache bucketCache = null;
+ try {
+ bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
+ smallBucketSizes, writeThreads, writerQLen, persistencePath);
+ assertFalse(new File(persistencePath).exists());
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+ } finally {
+ bucketCache.shutdown();
+ HBASE_TESTING_UTILITY.cleanupTestDir();
+ }
}
private void testRetrievalUtils(Path testDir, String ioEngineName)
throws IOException, InterruptedException {
- final String persistencePath = testDir + "/bucket.persistence";
- BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
- constructedBlockSizes, writeThreads, writerQLen, persistencePath);
+ final String persistencePath =
+ testDir + "/bucket.persistence" + EnvironmentEdgeManager.currentTime();
+ BucketCache bucketCache = null;
try {
+ bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
+ constructedBlockSizes, writeThreads, writerQLen, persistencePath);
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
@@ -353,7 +366,9 @@ public class TestBucketCache {
assertFalse(new File(persistencePath).exists());
assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
} finally {
- bucketCache.shutdown();
+ if (bucketCache != null) {
+ bucketCache.shutdown();
+ }
}
assertTrue(new File(persistencePath).exists());
}
@@ -382,12 +397,17 @@ public class TestBucketCache {
testRetrievalUtils(testDirInitial, ioEngineName);
int[] smallBucketSizes = new int[] { 3 * 1024, 5 * 1024 };
String persistencePath = testDirInitial + "/bucket.persistence";
- BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
- smallBucketSizes, writeThreads, writerQLen, persistencePath);
- assertFalse(new File(persistencePath).exists());
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
- HBASE_TESTING_UTILITY.cleanupTestDir();
+ BucketCache bucketCache = null;
+ try {
+ bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
+ smallBucketSizes, writeThreads, writerQLen, persistencePath);
+ assertFalse(new File(persistencePath).exists());
+ assertEquals(0, bucketCache.getAllocator().getUsedSize());
+ assertEquals(0, bucketCache.backingMap.size());
+ } finally {
+ bucketCache.shutdown();
+ HBASE_TESTING_UTILITY.cleanupTestDir();
+ }
}
@Test
@@ -572,7 +592,7 @@ public class TestBucketCache {
// This number is picked because it produces negative output if the values
isn't ensured to be
// positive. See HBASE-18757 for more information.
long testValue = 549888460800L;
- BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10L, true,
(entry) -> {
+ BucketEntry bucketEntry = new BucketEntry(testValue, 10, 10, 10L, true,
(entry) -> {
return ByteBuffAllocator.NONE;
}, ByteBuffAllocator.HEAP);
assertEquals(testValue, bucketEntry.offset());
@@ -701,8 +721,8 @@ public class TestBucketCache {
HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
HFileBlock blk2 = new HFileBlock(BlockType.DATA, size, size, -1,
ByteBuff.wrap(buf),
HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
- RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false);
- RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false);
+ RAMQueueEntry re1 = new RAMQueueEntry(key1, blk1, 1, false, false);
+ RAMQueueEntry re2 = new RAMQueueEntry(key1, blk2, 1, false, false);
assertFalse(cache.containsKey(key1));
assertNull(cache.putIfAbsent(key1, re1));
@@ -749,7 +769,7 @@ public class TestBucketCache {
BucketAllocator allocator = new BucketAllocator(availableSpace, null);
BlockCacheKey key = new BlockCacheKey("dummy", 1L);
- RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true);
+ RAMQueueEntry re = new RAMQueueEntry(key, block, 1, true, false);
Assert.assertEquals(0, allocator.getUsedSize());
try {
@@ -768,13 +788,14 @@ public class TestBucketCache {
*/
@Test
public void testFreeBucketEntryRestoredFromFile() throws Exception {
+ BucketCache bucketCache = null;
try {
final Path dataTestDir = createAndGetTestDir();
String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
- BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
+ bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, persistencePath);
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);
@@ -809,19 +830,21 @@ public class TestBucketCache {
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
} finally {
+ bucketCache.shutdown();
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}
@Test
public void testBlockAdditionWaitWhenCache() throws Exception {
+ BucketCache bucketCache = null;
try {
final Path dataTestDir = createAndGetTestDir();
String ioEngineName = "file:" + dataTestDir + "/bucketNoRecycler.cache";
String persistencePath = dataTestDir + "/bucketNoRecycler.persistence";
- BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
+ bucketCache = new BucketCache(ioEngineName, capacitySize,
constructedBlockSize,
constructedBlockSizes, 1, 1, persistencePath);
long usedByteSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedByteSize);
@@ -864,6 +887,9 @@ public class TestBucketCache {
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
} finally {
+ if (bucketCache != null) {
+ bucketCache.shutdown();
+ }
HBASE_TESTING_UTILITY.cleanupTestDir();
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
index dbd3d7f8664..bd69f28e1ea 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -85,7 +84,6 @@ public class TestBucketCachePersister {
}
public BucketCache setupBucketCache(Configuration conf) throws IOException {
- conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, (testDir +
"/prefetch.persistence"));
BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
@@ -111,9 +109,7 @@ public class TestBucketCachePersister {
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
Thread.sleep(bucketCachePersistInterval);
- assertTrue(new File(testDir + "/prefetch.persistence").exists());
assertTrue(new File(testDir + "/bucket.persistence").exists());
- assertTrue(new File(testDir + "/prefetch.persistence").delete());
assertTrue(new File(testDir + "/bucket.persistence").delete());
cleanupBucketCache(bucketCache);
}
@@ -128,7 +124,6 @@ public class TestBucketCachePersister {
// Load Cache
Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs);
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
- assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertFalse(new File(testDir + "/bucket.persistence").exists());
cleanupBucketCache(bucketCache);
}
@@ -144,10 +139,10 @@ public class TestBucketCachePersister {
readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache1);
Thread.sleep(500);
// Evict Blocks from cache
+ assertTrue(bucketCache1.fullyCachedFiles.containsKey(storeFile.getName()));
BlockCacheKey bucketCacheKey =
bucketCache1.backingMap.entrySet().iterator().next().getKey();
- assertTrue(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
bucketCache1.evictBlock(bucketCacheKey);
- assertFalse(PrefetchExecutor.isFilePrefetched(storeFile.getName()));
+
assertFalse(bucketCache1.fullyCachedFiles.containsKey(storeFile.getName()));
}
public void readStoreFile(Path storeFilePath, long offset, FileSystem fs,
CacheConfig cacheConf,
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
index 820e91aa6e8..b42e7be804d 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java
@@ -49,7 +49,7 @@ public class TestByteBufferIOEngine {
private long off;
MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
- super(offset & 0xFF00, length, 0, false, (entry) -> {
+ super(offset & 0xFF00, length, length, 0, false, (entry) -> {
return ByteBuffAllocator.NONE;
}, allocator);
this.off = offset;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
index 771ab0158f6..f15874bc61c 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestPrefetchPersistence.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -106,8 +105,6 @@ public class TestPrefetchPersistence {
conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
- prefetchPersistencePath = testDir + "/prefetch.persistence";
- conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY,
prefetchPersistencePath);
fs = HFileSystem.get(conf);
}
@@ -132,10 +129,10 @@ public class TestPrefetchPersistence {
bucketCache.shutdown();
assertTrue(new File(testDir + "/bucket.persistence").exists());
- assertTrue(new File(testDir + "/prefetch.persistence").exists());
bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
testDir + "/bucket.persistence", 60 * 1000, conf);
+ cacheConf = new CacheConfig(conf, bucketCache);
assertFalse(new File(testDir + "/bucket.persistence").exists());
assertFalse(new File(testDir + "/prefetch.persistence").exists());
assertTrue(usedSize != 0);
@@ -148,9 +145,9 @@ public class TestPrefetchPersistence {
public void closeStoreFile(Path path) throws Exception {
HFile.Reader reader = HFile.createReader(fs, path, cacheConf, true, conf);
- assertTrue(PrefetchExecutor.isFilePrefetched(path.getName()));
+ assertTrue(bucketCache.fullyCachedFiles.containsKey(path.getName()));
reader.close(true);
- assertFalse(PrefetchExecutor.isFilePrefetched(path.getName()));
+ assertFalse(bucketCache.fullyCachedFiles.containsKey(path.getName()));
}
public void readStoreFile(Path storeFilePath, long offset) throws Exception {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
index 0e777a4a7b9..58d9385f57e 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRAMCache.java
@@ -90,7 +90,7 @@ public class TestRAMCache {
MockHFileBlock blk = new MockHFileBlock(BlockType.DATA, size, size, -1,
ByteBuffer.wrap(byteArr, 0, size), HFileBlock.FILL_HEADER, -1, 52, -1,
new HFileContextBuilder().build(), ByteBuffAllocator.HEAP);
- RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false);
+ RAMQueueEntry re = new RAMQueueEntry(key, blk, 1, false, false);
Assert.assertNull(cache.putIfAbsent(key, re));
Assert.assertEquals(cache.putIfAbsent(key, re), re);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
index 3b2b9961b2b..6fdea844aa3 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -17,11 +17,15 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
+import static
org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
+import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.BufferedWriter;
+import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.nio.file.FileSystems;
@@ -32,12 +36,15 @@ import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
import org.apache.hadoop.hbase.io.hfile.Cacheable;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -147,16 +154,15 @@ public class TestVerifyBucketCacheFile {
@Test
public void testRetrieveFromFileAfterDelete() throws Exception {
-
HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
Path testDir = TEST_UTIL.getDataTestDir();
TEST_UTIL.getTestFileSystem().mkdirs(testDir);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 300);
-
- BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
- testDir + "/bucket.persistence", 60 * 1000, conf);
+ String mapFileName = testDir + "/bucket.persistence" +
EnvironmentEdgeManager.currentTime();
+ BucketCache bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
+ constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 *
1000, conf);
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
@@ -171,14 +177,13 @@ public class TestVerifyBucketCacheFile {
// Shutdown BucketCache
bucketCache.shutdown();
// Delete the persistence file
- final java.nio.file.Path mapFile =
- FileSystems.getDefault().getPath(testDir.toString(),
"bucket.persistence");
- assertTrue(Files.deleteIfExists(mapFile));
+ File mapFile = new File(mapFileName);
+ assertTrue(mapFile.delete());
Thread.sleep(350);
// Create BucketCache
- bucketCache = new BucketCache("file:" + testDir + "/bucket.cache",
capacitySize,
- constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
- testDir + "/bucket.persistence", 60 * 1000, conf);
+ bucketCache =
+ new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
+ constructedBlockSizes, writeThreads, writerQLen, mapFileName, 60 *
1000, conf);
assertEquals(0, bucketCache.getAllocator().getUsedSize());
assertEquals(0, bucketCache.backingMap.size());
}
@@ -232,9 +237,15 @@ public class TestVerifyBucketCacheFile {
/**
* Test whether BucketCache is started normally after modifying the cache
file's last modified
* time. First Start BucketCache and add some blocks, then shutdown
BucketCache and persist cache
- * to file. Then Restart BucketCache after modify cache file's last modified
time, and it can't
- * restore cache from file, the cache file and persistence file would be
deleted before
- * BucketCache start normally.
+ * to file. Then Restart BucketCache after modify cache file's last modified
time. HBASE-XXXX has
+ * modified persistence cache such that now we store extra 8 bytes at the
end of each block in the
+ * cache, representing the nanosecond time the block has been cached. So in
the event the cache
+ * file has failed checksum verification during loading time, we go through
all the cached blocks
+ * in the cache map and validate the cached time long between what is in the
map and the cache
+ * file. If that check fails, we pull the cache key entry out of the map.
Since in this test we
+ * are only modifying the access time to induce a checksum error, the cache
file content is still
+ * valid and the extra verification should validate that all cache keys in
the map are still
+ * recoverable from the cache.
* @throws Exception the exception
*/
@Test
@@ -249,6 +260,8 @@ public class TestVerifyBucketCacheFile {
long usedSize = bucketCache.getAllocator().getUsedSize();
assertEquals(0, usedSize);
+ Pair<String, Long> myPair = new Pair<>();
+
CacheTestUtils.HFileBlockPair[] blocks =
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
// Add blocks
@@ -257,6 +270,8 @@ public class TestVerifyBucketCacheFile {
}
usedSize = bucketCache.getAllocator().getUsedSize();
assertNotEquals(0, usedSize);
+ long blockCount = bucketCache.backingMap.size();
+ assertNotEquals(0, blockCount);
// persist cache to file
bucketCache.shutdown();
@@ -268,9 +283,64 @@ public class TestVerifyBucketCacheFile {
bucketCache =
new BucketCache("file:" + testDir + "/bucket.cache", capacitySize,
constructedBlockSize,
constructedBlockSizes, writeThreads, writerQLen, testDir +
"/bucket.persistence");
- assertEquals(0, bucketCache.getAllocator().getUsedSize());
- assertEquals(0, bucketCache.backingMap.size());
+ assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+ assertEquals(blockCount, bucketCache.backingMap.size());
+
+ TEST_UTIL.cleanupTestDir();
+ }
+
+ /**
+ * When using persistent bucket cache, there may be crashes between
persisting the backing map and
+ * syncing new blocks to the cache file itself, leading to an inconsistent
state between the cache
+ * keys and the cached data. This is to make sure the cache keys are updated
accordingly, and the
+ * keys that are still valid do succeed in retrieve related block data from
the cache without any
+ * corruption.
+ * @throws Exception the exception
+ */
+ @Test
+ public void testBucketCacheRecovery() throws Exception {
+ HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+ Path testDir = TEST_UTIL.getDataTestDir();
+ TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+ Configuration conf = HBaseConfiguration.create();
+ // Disables the persister thread by setting its interval to MAX_VALUE
+ conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
+ String mapFileName = testDir + "/bucket.persistence" +
EnvironmentEdgeManager.currentTime();
+ BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, 4);
+ // Add three blocks
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(),
blocks[0].getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(),
blocks[1].getBlock());
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(),
blocks[2].getBlock());
+ // saves the current state
+ bucketCache.persistToFile();
+ // evicts first block
+ bucketCache.evictBlock(blocks[0].getBlockName());
+
+ // now adds a fourth block to bucket cache
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(),
blocks[3].getBlock());
+ // Creates new bucket cache instance without persisting to file after
evicting first block
+ // and caching fourth block. So the bucket cache file has only the last
three blocks,
+ // but backing map (containing cache keys) was persisted when first three
blocks
+ // were in the cache. So the state on this recovery is:
+ // - Backing map: [block0, block1, block2]
+ // - Cache: [block1, block2, block3]
+ // Therefore, this bucket cache would be able to recover only block1 and
block2.
+ BucketCache newBucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+ assertNull(newBucketCache.getBlock(blocks[0].getBlockName(), false, false,
false));
+ assertEquals(blocks[1].getBlock(),
+ newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
+ assertEquals(blocks[2].getBlock(),
+ newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
+ assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false,
false));
+ assertEquals(2, newBucketCache.backingMap.size());
TEST_UTIL.cleanupTestDir();
}