This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 7ed0dc974f3 HBASE-28170 Put the cached time at the beginning of the 
block; run cache validation in the background when retrieving the persistent 
cache (#5471)
7ed0dc974f3 is described below

commit 7ed0dc974f35badef8f4656e693c81d2d745051a
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Tue Oct 24 09:30:11 2023 +0100

    HBASE-28170 Put the cached time at the beginning of the block; run cache 
validation in the background when retrieving the persistent cache (#5471)
    
    Signed-off-by: Peter Somogyi <[email protected]>
    
    (cherry-picked from commit 41057bbf357e308f7f65887c2e5fd92931276119)
---
 .../hadoop/hbase/io/hfile/HFilePreadReader.java    |  39 +++----
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  | 100 +++++++++++-----
 .../io/hfile/bucket/BucketCachePersister.java      |  41 +++++--
 .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java |  16 ++-
 .../io/hfile/TestPrefetchWithBucketCache.java      |   2 +-
 .../bucket/TestRecoveryPersistentBucketCache.java  | 126 +++++++++++++++++++++
 .../io/hfile/bucket/TestVerifyBucketCacheFile.java |  12 +-
 7 files changed, 266 insertions(+), 70 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index f9c0ae59242..1ac9a4ffb84 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -40,7 +40,9 @@ public class HFilePreadReader extends HFileReaderImpl {
     Configuration conf) throws IOException {
     super(context, fileInfo, cacheConf, conf);
     final MutableBoolean fileAlreadyCached = new MutableBoolean(false);
-    BucketCache.getBuckedCacheFromCacheConfig(cacheConf).ifPresent(bc -> 
fileAlreadyCached
+    Optional<BucketCache> bucketCacheOptional =
+      BucketCache.getBucketCacheFromCacheConfig(cacheConf);
+    bucketCacheOptional.ifPresent(bc -> fileAlreadyCached
       .setValue(bc.getFullyCachedFiles().get(path.getName()) == null ? false : 
true));
     // Prefetch file blocks upon open if requested
     if (
@@ -65,8 +67,6 @@ public class HFilePreadReader extends HFileReaderImpl {
             if (LOG.isTraceEnabled()) {
               LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, 
end));
             }
-            Optional<BucketCache> bucketCacheOptional =
-              BucketCache.getBuckedCacheFromCacheConfig(cacheConf);
             // Don't use BlockIterator here, because it's designed to read 
load-on-open section.
             long onDiskSizeOfNextBlock = -1;
             while (offset < end) {
@@ -78,21 +78,24 @@ public class HFilePreadReader extends HFileReaderImpl {
               // to the next block without actually going read all the way to 
the cache.
               if (bucketCacheOptional.isPresent()) {
                 BucketCache cache = bucketCacheOptional.get();
-                BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
-                BucketEntry entry = cache.getBackingMap().get(cacheKey);
-                if (entry != null) {
-                  cacheKey = new BlockCacheKey(name, offset);
-                  entry = cache.getBackingMap().get(cacheKey);
-                  if (entry == null) {
-                    LOG.debug("No cache key {}, we'll read and cache it", 
cacheKey);
+                if (cache.getBackingMapValidated().get()) {
+                  BlockCacheKey cacheKey = new BlockCacheKey(name, offset);
+                  BucketEntry entry = cache.getBackingMap().get(cacheKey);
+                  if (entry != null) {
+                    cacheKey = new BlockCacheKey(name, offset);
+                    entry = cache.getBackingMap().get(cacheKey);
+                    if (entry == null) {
+                      LOG.debug("No cache key {}, we'll read and cache it", 
cacheKey);
+                    } else {
+                      offset += entry.getOnDiskSizeWithHeader();
+                      LOG.debug(
+                        "Found cache key {}. Skipping prefetch, the block is 
already cached.",
+                        cacheKey);
+                      continue;
+                    }
                   } else {
-                    offset += entry.getOnDiskSizeWithHeader();
-                    LOG.debug("Found cache key {}. Skipping prefetch, the 
block is already cached.",
-                      cacheKey);
-                    continue;
+                    LOG.debug("No entry in the backing map for cache key {}", 
cacheKey);
                   }
-                } else {
-                  LOG.debug("No entry in the backing map for cache key {}", 
cacheKey);
                 }
               }
               // Perhaps we got our block from cache? Unlikely as this may be, 
if it happens, then
@@ -111,9 +114,7 @@ public class HFilePreadReader extends HFileReaderImpl {
                 block.release();
               }
             }
-            BucketCache.getBuckedCacheFromCacheConfig(cacheConf)
-              .ifPresent(bc -> bc.fileCacheCompleted(path.getName()));
-
+            bucketCacheOptional.ifPresent(bc -> 
bc.fileCacheCompleted(path.getName()));
           } catch (IOException e) {
             // IOExceptions are probably due to region closes (relocation, 
etc.)
             if (LOG.isTraceEnabled()) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 5768b5b03d5..f7f6f8a8ee0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -144,6 +144,8 @@ public class BucketCache implements BlockCache, HeapSize {
   // In this map, store the block's meta data like offset, length
   transient Map<BlockCacheKey, BucketEntry> backingMap;
 
+  private AtomicBoolean backingMapValidated = new AtomicBoolean(false);
+
   /** Set of files for which prefetch is completed */
   final Map<String, Boolean> fullyCachedFiles = new ConcurrentHashMap<>();
 
@@ -301,7 +303,6 @@ public class BucketCache implements BlockCache, HeapSize {
 
     this.allocFailLogPrevTs = 0;
 
-    bucketAllocator = new BucketAllocator(capacity, bucketSizes);
     for (int i = 0; i < writerThreads.length; ++i) {
       writerQueues.add(new ArrayBlockingQueue<>(writerQLen));
     }
@@ -318,10 +319,14 @@ public class BucketCache implements BlockCache, HeapSize {
       try {
         retrieveFromFile(bucketSizes);
       } catch (IOException ioex) {
+        LOG.error("Can't restore from file[{}] because of ", persistencePath, 
ioex);
         backingMap.clear();
         fullyCachedFiles.clear();
-        LOG.error("Can't restore from file[" + persistencePath + "] because of 
", ioex);
+        backingMapValidated.set(true);
+        bucketAllocator = new BucketAllocator(capacity, bucketSizes);
       }
+    } else {
+      bucketAllocator = new BucketAllocator(capacity, bucketSizes);
     }
     final String threadName = Thread.currentThread().getName();
     this.cacheEnabled = true;
@@ -374,6 +379,7 @@ public class BucketCache implements BlockCache, HeapSize {
   }
 
   void startBucketCachePersisterThread() {
+    LOG.info("Starting BucketCachePersisterThread");
     cachePersister = new BucketCachePersister(this, 
bucketcachePersistInterval);
     cachePersister.setDaemon(true);
     cachePersister.start();
@@ -529,6 +535,7 @@ public class BucketCache implements BlockCache, HeapSize {
     } else {
       this.blockNumber.increment();
       this.heapSize.add(cachedItem.heapSize());
+      blocksByHFile.add(cacheKey);
     }
   }
 
@@ -589,6 +596,7 @@ public class BucketCache implements BlockCache, HeapSize {
         // the cache map state might differ from the actual cache. If we reach 
this block,
         // we should remove the cache key entry from the backing map
         backingMap.remove(key);
+        fullyCachedFiles.remove(key.getHfileName());
         LOG.debug("Failed to fetch block for cache key: {}.", key, hioex);
       } catch (IOException ioex) {
         LOG.error("Failed reading block " + key + " from bucket cache", ioex);
@@ -684,6 +692,7 @@ public class BucketCache implements BlockCache, HeapSize {
     } else {
       return bucketEntryToUse.withWriteLock(offsetLock, () -> {
         if (backingMap.remove(cacheKey, bucketEntryToUse)) {
+          LOG.debug("removed key {} from back map in the evict process", 
cacheKey);
           blockEvicted(cacheKey, bucketEntryToUse, !existedInRamCache, 
evictedByEvictionProcess);
           return true;
         }
@@ -1255,6 +1264,8 @@ public class BucketCache implements BlockCache, HeapSize {
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"OBL_UNSATISFIED_OBLIGATION",
       justification = "false positive, try-with-resources ensures close is 
called.")
   void persistToFile() throws IOException {
+    LOG.debug("Thread {} started persisting bucket cache to file",
+      Thread.currentThread().getName());
     if (!isCachePersistent()) {
       throw new IOException("Attempt to persist non-persistent cache 
mappings!");
     }
@@ -1262,14 +1273,19 @@ public class BucketCache implements BlockCache, 
HeapSize {
     try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, 
false)) {
       fos.write(ProtobufMagic.PB_MAGIC);
       BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
+    } catch (IOException e) {
+      LOG.error("Failed to persist bucket cache to file", e);
+      throw e;
     }
+    LOG.debug("Thread {} finished persisting bucket cache to file, renaming",
+      Thread.currentThread().getName());
     if (!tempPersistencePath.renameTo(new File(persistencePath))) {
       LOG.warn("Failed to commit cache persistent file. We might lose cached 
blocks if "
         + "RS crashes/restarts before we successfully checkpoint again.");
     }
   }
 
-  private boolean isCachePersistent() {
+  public boolean isCachePersistent() {
     return ioEngine.isPersistent() && persistencePath != null;
   }
 
@@ -1277,8 +1293,14 @@ public class BucketCache implements BlockCache, HeapSize 
{
    * @see #persistToFile()
    */
   private void retrieveFromFile(int[] bucketSizes) throws IOException {
+    LOG.info("Started retrieving bucket cache from file");
     File persistenceFile = new File(persistencePath);
     if (!persistenceFile.exists()) {
+      LOG.warn("Persistence file missing! "
+        + "It's ok if it's first run after enabling persistent cache.");
+      bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, 
backingMap, realCacheSize);
+      blockNumber.add(backingMap.size());
+      backingMapValidated.set(true);
       return;
     }
     assert !cacheEnabled;
@@ -1296,6 +1318,7 @@ public class BucketCache implements BlockCache, HeapSize {
       parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
       bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, 
backingMap, realCacheSize);
       blockNumber.add(backingMap.size());
+      LOG.info("Bucket cache retrieved from file successfully");
     }
   }
 
@@ -1368,27 +1391,43 @@ public class BucketCache implements BlockCache, 
HeapSize {
       try {
         ((PersistentIOEngine) 
ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
           algorithm);
+        backingMapValidated.set(true);
       } catch (IOException e) {
         LOG.warn("Checksum for cache file failed. "
-          + "We need to validate each cache key in the backing map. This may 
take some time...");
-        long startTime = EnvironmentEdgeManager.currentTime();
-        int totalKeysOriginally = backingMap.size();
-        for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : 
backingMap.entrySet()) {
-          try {
-            ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
-          } catch (IOException e1) {
-            LOG.debug("Check for key {} failed. Removing it from map.", 
keyEntry.getKey());
-            backingMap.remove(keyEntry.getKey());
-            fullyCachedFiles.remove(keyEntry.getKey().getHfileName());
+          + "We need to validate each cache key in the backing map. "
+          + "This may take some time, so we'll do it in a background thread,");
+        Runnable cacheValidator = () -> {
+          while (bucketAllocator == null) {
+            try {
+              Thread.sleep(50);
+            } catch (InterruptedException ex) {
+              throw new RuntimeException(ex);
+            }
           }
-        }
-        LOG.info("Finished validating {} keys in the backing map. Recovered: 
{}. This took {}ms.",
-          totalKeysOriginally, backingMap.size(),
-          (EnvironmentEdgeManager.currentTime() - startTime));
+          long startTime = EnvironmentEdgeManager.currentTime();
+          int totalKeysOriginally = backingMap.size();
+          for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : 
backingMap.entrySet()) {
+            try {
+              ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
+            } catch (IOException e1) {
+              LOG.debug("Check for key {} failed. Evicting.", 
keyEntry.getKey());
+              evictBlock(keyEntry.getKey());
+              fullyCachedFiles.remove(keyEntry.getKey().getHfileName());
+            }
+          }
+          backingMapValidated.set(true);
+          LOG.info("Finished validating {} keys in the backing map. Recovered: 
{}. This took {}ms.",
+            totalKeysOriginally, backingMap.size(),
+            (EnvironmentEdgeManager.currentTime() - startTime));
+        };
+        Thread t = new Thread(cacheValidator);
+        t.setDaemon(true);
+        t.start();
       }
     } else {
       // if has not checksum, it means the persistence file is old format
       LOG.info("Persistent file is old format, it does not support verifying 
file integrity!");
+      backingMapValidated.set(true);
     }
     verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), 
proto.getMapClass());
   }
@@ -1417,6 +1456,7 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   private void disableCache() {
     if (!cacheEnabled) return;
+    LOG.info("Disabling cache");
     cacheEnabled = false;
     ioEngine.shutdown();
     this.scheduleThreadPool.shutdown();
@@ -1441,11 +1481,15 @@ public class BucketCache implements BlockCache, 
HeapSize {
     LOG.info("Shutdown bucket cache: IO persistent=" + ioEngine.isPersistent() 
+ "; path to write="
       + persistencePath);
     if (ioEngine.isPersistent() && persistencePath != null) {
-      if (cachePersister != null) {
-        cachePersister.interrupt();
-      }
       try {
         join();
+        if (cachePersister != null) {
+          LOG.info("Shutting down cache persister thread.");
+          cachePersister.shutdown();
+          while (cachePersister.isAlive()) {
+            Thread.sleep(10);
+          }
+        }
         persistToFile();
       } catch (IOException ex) {
         LOG.error("Unable to persist data on exit: " + ex.toString(), ex);
@@ -1650,17 +1694,17 @@ public class BucketCache implements BlockCache, 
HeapSize {
           HFileBlock block = (HFileBlock) data;
           ByteBuff sliceBuf = block.getBufferReadOnly();
           block.getMetaData(metaBuff);
-          ioEngine.write(sliceBuf, offset);
-          // adds the cache time after the block and metadata part
+          // adds the cache time prior to the block and metadata part
           if (isCachePersistent) {
-            ioEngine.write(metaBuff, offset + len - metaBuff.limit() - 
Long.BYTES);
             ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
             buffer.putLong(bucketEntry.getCachedTime());
             buffer.rewind();
-            ioEngine.write(buffer, (offset + len - Long.BYTES));
+            ioEngine.write(buffer, offset);
+            ioEngine.write(sliceBuf, (offset + Long.BYTES));
           } else {
-            ioEngine.write(metaBuff, offset + len - metaBuff.limit());
+            ioEngine.write(sliceBuf, offset);
           }
+          ioEngine.write(metaBuff, offset + len - metaBuff.limit());
         } else {
           // Only used for testing.
           ByteBuffer bb = ByteBuffer.allocate(len);
@@ -1902,11 +1946,15 @@ public class BucketCache implements BlockCache, 
HeapSize {
     return backingMap;
   }
 
+  public AtomicBoolean getBackingMapValidated() {
+    return backingMapValidated;
+  }
+
   public Map<String, Boolean> getFullyCachedFiles() {
     return fullyCachedFiles;
   }
 
-  public static Optional<BucketCache> 
getBuckedCacheFromCacheConfig(CacheConfig cacheConf) {
+  public static Optional<BucketCache> 
getBucketCacheFromCacheConfig(CacheConfig cacheConf) {
     if (cacheConf.getBlockCache().isPresent()) {
       BlockCache bc = cacheConf.getBlockCache().get();
       if (bc instanceof CombinedBlockCache) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
index dbea4f3f325..e4382d2561e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +29,8 @@ public class BucketCachePersister extends Thread {
   private final long intervalMillis;
   private static final Logger LOG = 
LoggerFactory.getLogger(BucketCachePersister.class);
 
+  private AtomicBoolean shutdown = new AtomicBoolean(false);
+
   public BucketCachePersister(BucketCache cache, long intervalMillis) {
     super("bucket-cache-persister");
     this.cache = cache;
@@ -36,20 +39,34 @@ public class BucketCachePersister extends Thread {
   }
 
   public void run() {
-    while (true) {
-      try {
-        Thread.sleep(intervalMillis);
-        if (cache.isCacheInconsistent()) {
-          LOG.debug("Cache is inconsistent, persisting to disk");
-          cache.persistToFile();
-          cache.setCacheInconsistent(false);
+    try {
+      while (true) {
+        try {
+          Thread.sleep(intervalMillis);
+          if (cache.isCacheInconsistent()) {
+            LOG.debug("Cache is inconsistent, persisting to disk");
+            cache.persistToFile();
+            cache.setCacheInconsistent(false);
+          }
+          // Thread.interrupt may cause an InterruptException inside util 
method used for checksum
+          // calculation in persistToFile. This util currently swallows the 
exception, causing this
+          // thread to net get interrupt, so we added this flag to indicate 
the persister thread
+          // should stop.
+          if (shutdown.get()) {
+            break;
+          }
+        } catch (IOException e) {
+          LOG.warn("Exception in BucketCachePersister.", e);
         }
-      } catch (IOException e) {
-        LOG.warn("IOException in BucketCachePersister {} ", e.getMessage());
-      } catch (InterruptedException iex) {
-        LOG.warn("InterruptedException in BucketCachePersister {} ", 
iex.getMessage());
-        break;
       }
+      LOG.info("Finishing cache persister thread.");
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupting BucketCachePersister thread.", e);
     }
   }
+
+  public void shutdown() {
+    this.shutdown.set(true);
+    this.interrupt();
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index 38f9db04b6d..ba431c4c6dc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -149,15 +149,14 @@ public class FileIOEngine extends PersistentIOEngine {
       }
     }
     if (maintainPersistence) {
-      dstBuff.position(length - Long.BYTES);
+      dstBuff.rewind();
       long cachedNanoTime = dstBuff.getLong();
       if (be.getCachedTime() != cachedNanoTime) {
         dstBuff.release();
-        throw new HBaseIOException("The cached time recorded within the cached 
block differs "
-          + "from its bucket entry, so it might not be the same.");
+        throw new HBaseIOException("The cached time recorded within the cached 
block: "
+          + cachedNanoTime + " differs from its bucket entry: " + 
be.getCachedTime());
       }
-      dstBuff.rewind();
-      dstBuff.limit(length - Long.BYTES);
+      dstBuff.limit(length);
       dstBuff = dstBuff.slice();
     } else {
       dstBuff.rewind();
@@ -167,10 +166,9 @@ public class FileIOEngine extends PersistentIOEngine {
 
   void checkCacheTime(BucketEntry be) throws IOException {
     long offset = be.offset();
-    int length = be.getLength();
     ByteBuff dstBuff = be.allocator.allocate(Long.BYTES);
     try {
-      accessFile(readAccessor, dstBuff, (offset + length - Long.BYTES));
+      accessFile(readAccessor, dstBuff, offset);
     } catch (IOException ioe) {
       dstBuff.release();
       throw ioe;
@@ -179,8 +177,8 @@ public class FileIOEngine extends PersistentIOEngine {
     long cachedNanoTime = dstBuff.getLong();
     if (be.getCachedTime() != cachedNanoTime) {
       dstBuff.release();
-      throw new HBaseIOException("The cached time recorded within the cached 
block differs "
-        + "from its bucket entry, so it might not be the same.");
+      throw new HBaseIOException("The cached time recorded within the cached 
block: "
+        + cachedNanoTime + " differs from its bucket entry: " + 
be.getCachedTime());
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
index 972d4942819..768e41019d8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
@@ -106,7 +106,7 @@ public class TestPrefetchWithBucketCache {
     // Prefetches the file blocks
     LOG.debug("First read should prefetch the blocks.");
     readStoreFile(storeFile);
-    BucketCache bc = 
BucketCache.getBuckedCacheFromCacheConfig(cacheConf).get();
+    BucketCache bc = 
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
     // Our file should have 6 DATA blocks. We should wait for all of them to 
be cached
     Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
     Map<BlockCacheKey, BucketEntry> snapshot = 
ImmutableMap.copyOf(bc.getBackingMap());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
new file mode 100644
index 00000000000..5c01a6003e8
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestRecoveryPersistentBucketCache.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
+import static 
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Basic test for check file's integrity before start BucketCache in 
fileIOEngine
+ */
+@Category(SmallTests.class)
+public class TestRecoveryPersistentBucketCache {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRecoveryPersistentBucketCache.class);
+
+  final long capacitySize = 32 * 1024 * 1024;
+  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
+  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
+
+  @Test
+  public void testBucketCacheRecovery() throws Exception {
+    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+    Path testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+    Configuration conf = HBaseConfiguration.create();
+    // Disables the persister thread by setting its interval to MAX_VALUE
+    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
+    int[] bucketSizes = new int[] { 8 * 1024 + 1024 };
+    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      8192, bucketSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence",
+      DEFAULT_ERROR_TOLERATION_DURATION, conf);
+
+    CacheTestUtils.HFileBlockPair[] blocks = 
CacheTestUtils.generateHFileBlocks(8192, 4);
+
+    CacheTestUtils.HFileBlockPair[] smallerBlocks = 
CacheTestUtils.generateHFileBlocks(4096, 1);
+    // Add four blocks
+    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[0].getBlockName(), 
blocks[0].getBlock());
+    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[1].getBlockName(), 
blocks[1].getBlock());
+    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[2].getBlockName(), 
blocks[2].getBlock());
+    cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[3].getBlockName(), 
blocks[3].getBlock());
+    // saves the current state of the cache
+    bucketCache.persistToFile();
+    // evicts the 4th block
+    bucketCache.evictBlock(blocks[3].getBlockName());
+    // now adds a 5th block to bucket cache. This block is half the size of 
the previous
+    // blocks, and it will be added in the same offset of the previous evicted 
block.
+    // This overwrites part of the 4th block. Because we persisted only up to 
the
+    // 4th block addition, recovery would try to read the whole 4th block, but 
the cached time
+    // validation will fail, and we'll recover only the first three blocks
+    cacheAndWaitUntilFlushedToBucket(bucketCache, 
smallerBlocks[0].getBlockName(),
+      smallerBlocks[0].getBlock());
+
+    // Creates new bucket cache instance without persisting to file after 
evicting 4th block
+    // and caching 5th block. Here the cache file has the first three blocks, 
followed by the
+    // 5th block and the second half of 4th block (we evicted 4th block, 
freeing up its
+    // offset in the cache, then added 5th block which is half the size of 
other blocks, so it's
+    // going to override the first half of the 4th block in the cache). That's 
fine because
+    // the in-memory backing map has the right blocks and related offsets. 
However, the
+    // persistent map file only has information about the first four blocks. 
We validate the
+    // cache time recorded in the back map against the block data in the 
cache. This is recorded
+    // in the cache as the first 8 bytes of a block, so the 4th block had its 
first 8 blocks
+    // now overridden by the 5th block, causing this check to fail and removal 
of
+    // the 4th block from the backing map.
+    BucketCache newBucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      8192, bucketSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence",
+      DEFAULT_ERROR_TOLERATION_DURATION, conf);
+    Thread.sleep(100);
+    assertEquals(3, newBucketCache.backingMap.size());
+    assertNull(newBucketCache.getBlock(blocks[3].getBlockName(), false, false, 
false));
+    assertNull(newBucketCache.getBlock(smallerBlocks[0].getBlockName(), false, 
false, false));
+    assertEquals(blocks[0].getBlock(),
+      newBucketCache.getBlock(blocks[0].getBlockName(), false, false, false));
+    assertEquals(blocks[1].getBlock(),
+      newBucketCache.getBlock(blocks[1].getBlockName(), false, false, false));
+    assertEquals(blocks[2].getBlock(),
+      newBucketCache.getBlock(blocks[2].getBlockName(), false, false, false));
+    TEST_UTIL.cleanupTestDir();
+  }
+
+  private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey 
cacheKey)
+    throws InterruptedException {
+    while (!cache.backingMap.containsKey(cacheKey) || 
cache.ramCache.containsKey(cacheKey)) {
+      Thread.sleep(100);
+    }
+  }
+
+  // BucketCache.cacheBlock is async, it first adds block to ramCache and 
writeQueue, then writer
+  // threads will flush it to the bucket and put reference entry in backingMap.
+  private void cacheAndWaitUntilFlushedToBucket(BucketCache cache, 
BlockCacheKey cacheKey,
+    Cacheable block) throws InterruptedException {
+    cache.cacheBlock(cacheKey, block);
+    waitUntilFlushedToBucket(cache, cacheKey);
+  }
+
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
index 64dcd6c7067..474679c4aa5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -127,6 +127,7 @@ public class TestVerifyBucketCacheFile {
     bucketCache =
       new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
         constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
+    Thread.sleep(100);
     assertEquals(0, bucketCache.getAllocator().getUsedSize());
     assertEquals(0, bucketCache.backingMap.size());
     // Add blocks
@@ -146,6 +147,7 @@ public class TestVerifyBucketCacheFile {
     bucketCache =
       new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
         constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
+    Thread.sleep(100);
     assertEquals(0, bucketCache.getAllocator().getUsedSize());
     assertEquals(0, bucketCache.backingMap.size());
 
@@ -202,9 +204,12 @@ public class TestVerifyBucketCacheFile {
     Path testDir = TEST_UTIL.getDataTestDir();
     TEST_UTIL.getTestFileSystem().mkdirs(testDir);
 
-    BucketCache bucketCache =
-      new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
-        constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
+    Configuration conf = HBaseConfiguration.create();
+    // Disables the persister thread by setting its interval to MAX_VALUE
+    conf.setLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, Long.MAX_VALUE);
+    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+      testDir + "/bucket.persistence", DEFAULT_ERROR_TOLERATION_DURATION, 
conf);
     long usedSize = bucketCache.getAllocator().getUsedSize();
     assertEquals(0, usedSize);
 
@@ -229,6 +234,7 @@ public class TestVerifyBucketCacheFile {
     bucketCache =
       new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, 
constructedBlockSize,
         constructedBlockSizes, writeThreads, writerQLen, testDir + 
"/bucket.persistence");
+    Thread.sleep(100);
     assertEquals(0, bucketCache.getAllocator().getUsedSize());
     assertEquals(0, bucketCache.backingMap.size());
 

Reply via email to