This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new d136c6d7c57 HBASE-27686: Recovery of BucketCache and Prefetched data 
after RS Crash (#5080)
d136c6d7c57 is described below

commit d136c6d7c578a7d8d72c51f86a08fec1828f0a8e
Author: Kota-SH <[email protected]>
AuthorDate: Thu Mar 16 09:27:55 2023 -0500

    HBASE-27686: Recovery of BucketCache and Prefetched data after RS Crash 
(#5080)
    
    Signed-off-by: Wellington Ramos Chevreuil <[email protected]>
    (cherry picked from commit 58cb1f4799e90f5fca51f76aa4a9787d823c69a5)
---
 .../apache/hadoop/hbase/io/hfile/CacheConfig.java  |   6 +
 .../hadoop/hbase/io/hfile/PrefetchExecutor.java    |   2 +-
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |  29 ++-
 .../io/hfile/bucket/BucketCachePersister.java      |  52 ++++++
 .../hadoop/hbase/io/hfile/TestPrefetchRSClose.java |  26 ++-
 .../io/hfile/bucket/TestBucketCachePersister.java  | 194 +++++++++++++++++++++
 6 files changed, 290 insertions(+), 19 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 82dc6d1abfc..1a9ff7cef33 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -95,6 +95,12 @@ public class CacheConfig {
 
   public static final String PREFETCH_PERSISTENCE_PATH_KEY = 
"hbase.prefetch.file.list.path";
 
+  /**
+   * Configuration key to set interval for persisting bucket cache to disk.
+   */
+  public static final String BUCKETCACHE_PERSIST_INTERVAL_KEY =
+    "hbase.bucketcache.persist.intervalinmillis";
+
   // Defaults
   public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
   public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
index 9aafe7a7b6e..b30150fcb6d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
@@ -154,7 +154,7 @@ public final class PrefetchExecutor {
       throw new IOException("Error persisting prefetched HFiles set!");
     }
     if (!prefetchCompleted.isEmpty()) {
-      try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, 
true)) {
+      try (FileOutputStream fos = new FileOutputStream(prefetchedFileListPath, 
false)) {
         PrefetchProtoUtils.toPB(prefetchCompleted).writeDelimitedTo(fos);
       }
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index ad6510cb2fb..a9001c447f8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
+import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
 import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY;
 
 import java.io.File;
@@ -173,6 +174,7 @@ public class BucketCache implements BlockCache, HeapSize {
   private final BucketCacheStats cacheStats = new BucketCacheStats();
 
   private final String persistencePath;
+  static AtomicBoolean isCacheInconsistent = new AtomicBoolean(false);
   private final long cacheCapacity;
   /** Approximate block size */
   private final long blockSize;
@@ -233,6 +235,8 @@ public class BucketCache implements BlockCache, HeapSize {
 
   private String prefetchedFileListPath;
 
+  private long bucketcachePersistInterval;
+
   private static final String FILE_VERIFY_ALGORITHM =
     "hbase.bucketcache.persistent.file.integrity.check.algorithm";
   private static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5";
@@ -278,6 +282,7 @@ public class BucketCache implements BlockCache, HeapSize {
     this.queueAdditionWaitTime =
       conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
     this.prefetchedFileListPath = conf.get(PREFETCH_PERSISTENCE_PATH_KEY);
+    this.bucketcachePersistInterval = 
conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
 
     sanityCheckConfigs();
 
@@ -303,6 +308,7 @@ public class BucketCache implements BlockCache, HeapSize {
     this.backingMap = new ConcurrentHashMap<>((int) blockNumCapacity);
 
     if (ioEngine.isPersistent() && persistencePath != null) {
+      startBucketCachePersisterThread();
       try {
         retrieveFromFile(bucketSizes);
       } catch (IOException ioex) {
@@ -359,6 +365,12 @@ public class BucketCache implements BlockCache, HeapSize {
     }
   }
 
+  void startBucketCachePersisterThread() {
+    BucketCachePersister cachePersister =
+      new BucketCachePersister(this, bucketcachePersistInterval);
+    cachePersister.start();
+  }
+
   boolean isCacheEnabled() {
     return this.cacheEnabled;
   }
@@ -586,6 +598,9 @@ public class BucketCache implements BlockCache, HeapSize {
     if (evictedByEvictionProcess) {
       cacheStats.evicted(bucketEntry.getCachedTime(), cacheKey.isPrimary());
     }
+    if (ioEngine.isPersistent()) {
+      setCacheInconsistent(true);
+    }
   }
 
   /**
@@ -710,6 +725,14 @@ public class BucketCache implements BlockCache, HeapSize {
     });
   }
 
+  public boolean isCacheInconsistent() {
+    return isCacheInconsistent.get();
+  }
+
+  public void setCacheInconsistent(boolean setCacheInconsistent) {
+    isCacheInconsistent.set(setCacheInconsistent);
+  }
+
   /*
    * Statistics thread. Periodically output cache statistics to the log.
    */
@@ -1156,6 +1179,9 @@ public class BucketCache implements BlockCache, HeapSize {
       // Only add if non-null entry.
       if (bucketEntries[i] != null) {
         putIntoBackingMap(key, bucketEntries[i]);
+        if (ioEngine.isPersistent()) {
+          setCacheInconsistent(true);
+        }
       }
       // Always remove from ramCache even if we failed adding it to the block 
cache above.
       boolean existed = ramCache.remove(key, re -> {
@@ -1205,8 +1231,7 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = 
"OBL_UNSATISFIED_OBLIGATION",
       justification = "false positive, try-with-resources ensures close is 
called.")
-  private void persistToFile() throws IOException {
-    assert !cacheEnabled;
+  void persistToFile() throws IOException {
     if (!ioEngine.isPersistent()) {
       throw new IOException("Attempt to persist non-persistent cache 
mappings!");
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
new file mode 100644
index 00000000000..099a19db0a1
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCachePersister.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import java.io.IOException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
[email protected]
+public class BucketCachePersister extends Thread {
+  private final BucketCache cache;
+  private final long intervalMillis;
+  private static final Logger LOG = 
LoggerFactory.getLogger(BucketCachePersister.class);
+
+  public BucketCachePersister(BucketCache cache, long intervalMillis) {
+    super("bucket-cache-persister");
+    this.cache = cache;
+    this.intervalMillis = intervalMillis;
+    LOG.info("BucketCachePersister started with interval: " + intervalMillis);
+  }
+
+  public void run() {
+    while (true) {
+      try {
+        Thread.sleep(intervalMillis);
+        if (cache.isCacheInconsistent()) {
+          LOG.debug("Cache is inconsistent, persisting to disk");
+          cache.persistToFile();
+          cache.setCacheInconsistent(false);
+        }
+      } catch (IOException | InterruptedException e) {
+        LOG.warn("Exception in BucketCachePersister" + e.getMessage());
+      }
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
index edf65d9ba29..b0c4cafb2de 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchRSClose.java
@@ -82,7 +82,8 @@ public class TestPrefetchRSClose {
   }
 
   @Test
-  public void testRegionClosePrefetchPersistence() throws Exception {
+  public void testPrefetchPersistence() throws Exception {
+
     // Write to table and flush
     TableName tableName = TableName.valueOf("table1");
     byte[] row0 = Bytes.toBytes("row1");
@@ -106,8 +107,14 @@ public class TestPrefetchRSClose {
       table.put(put1);
       TEST_UTIL.flush(tableName);
     } finally {
-      Thread.sleep(1000);
+      Thread.sleep(1500);
     }
+
+    // Default interval for cache persistence is 1000ms. So after 1000ms, both 
the persistence files
+    // should exist.
+    assertTrue(new File(testDir + "/bucket.persistence").exists());
+    assertTrue(new File(testDir + "/prefetch.persistence").exists());
+
     // Stop the RS
     cluster.stopRegionServer(0);
     LOG.info("Stopped Region Server 0.");
@@ -117,20 +124,6 @@ public class TestPrefetchRSClose {
 
     // Start the RS and validate
     cluster.startRegionServer();
-    Thread.sleep(1000);
-    assertFalse(new File(testDir + "/prefetch.persistence").exists());
-    assertFalse(new File(testDir + "/bucket.persistence").exists());
-  }
-
-  @Test
-  public void testPrefetchPersistenceNegative() throws Exception {
-    cluster.stopRegionServer(0);
-    LOG.info("Stopped Region Server 0.");
-    Thread.sleep(1000);
-    assertFalse(new File(testDir + "/prefetch.persistence").exists());
-    assertTrue(new File(testDir + "/bucket.persistence").exists());
-    cluster.startRegionServer();
-    Thread.sleep(1000);
     assertFalse(new File(testDir + "/prefetch.persistence").exists());
     assertFalse(new File(testDir + "/bucket.persistence").exists());
   }
@@ -138,6 +131,7 @@ public class TestPrefetchRSClose {
   @After
   public void tearDown() throws Exception {
     TEST_UTIL.shutdownMiniCluster();
+    TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
     if (zkCluster != null) {
       zkCluster.shutdown();
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
new file mode 100644
index 00000000000..171d62d8b2c
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.bucket;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.fs.HFileSystem;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.RandomKeyValueUtil;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category({ IOTests.class, MediumTests.class })
+public class TestBucketCachePersister {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBucketCachePersister.class);
+
+  public TestName name = new TestName();
+
+  public int constructedBlockSize = 16 * 1024;
+
+  public int[] constructedBlockSizes =
+    new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 
1024,
+      28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 
128 * 1024 + 1024 };
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length 
- 2;
+  private static final int DATA_BLOCK_SIZE = 2048;
+  private static final int NUM_KV = 1000;
+
+  final long capacitySize = 32 * 1024 * 1024;
+  final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS;
+  final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS;
+  Path testDir;
+
+  public Configuration setupBucketCacheConfig(long bucketCachePersistInterval) 
throws IOException {
+    Configuration conf;
+    conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean(CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY, true);
+    testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+    conf.setLong(CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY, 
bucketCachePersistInterval);
+    return conf;
+  }
+
+  public BucketCache setupBucketCache(Configuration conf) throws IOException {
+    conf.set(CacheConfig.PREFETCH_PERSISTENCE_PATH_KEY, (testDir + 
"/prefetch.persistence"));
+    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
+      testDir + "/bucket.persistence", 60 * 1000, conf);
+    return bucketCache;
+  }
+
+  public void cleanupBucketCache(BucketCache bucketCache) throws IOException {
+    bucketCache.shutdown();
+    TEST_UTIL.cleanupDataTestDirOnTestFS(String.valueOf(testDir));
+    assertFalse(TEST_UTIL.getTestFileSystem().exists(testDir));
+  }
+
+  @Test
+  public void testPrefetchPersistenceCrash() throws Exception {
+    long bucketCachePersistInterval = 3000;
+    Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
+    BucketCache bucketCache = setupBucketCache(conf);
+    CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
+    FileSystem fs = HFileSystem.get(conf);
+    // Load Cache
+    Path storeFile = writeStoreFile("TestPrefetch0", conf, cacheConf, fs);
+    Path storeFile2 = writeStoreFile("TestPrefetch1", conf, cacheConf, fs);
+    readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
+    readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
+    Thread.sleep(bucketCachePersistInterval);
+    assertTrue(new File(testDir + "/prefetch.persistence").exists());
+    assertTrue(new File(testDir + "/bucket.persistence").exists());
+    assertTrue(new File(testDir + "/prefetch.persistence").delete());
+    assertTrue(new File(testDir + "/bucket.persistence").delete());
+    cleanupBucketCache(bucketCache);
+  }
+
+  @Test
+  public void testPrefetchPersistenceCrashNegative() throws Exception {
+    long bucketCachePersistInterval = 3000;
+    Configuration conf = setupBucketCacheConfig(bucketCachePersistInterval);
+    BucketCache bucketCache = setupBucketCache(conf);
+    CacheConfig cacheConf = new CacheConfig(conf, bucketCache);
+    FileSystem fs = HFileSystem.get(conf);
+    // Load Cache
+    Path storeFile = writeStoreFile("TestPrefetch2", conf, cacheConf, fs);
+    Path storeFile2 = writeStoreFile("TestPrefetch3", conf, cacheConf, fs);
+    readStoreFile(storeFile, 0, fs, cacheConf, conf, bucketCache);
+    readStoreFile(storeFile2, 0, fs, cacheConf, conf, bucketCache);
+    assertFalse(new File(testDir + "/prefetch.persistence").exists());
+    assertFalse(new File(testDir + "/bucket.persistence").exists());
+    cleanupBucketCache(bucketCache);
+  }
+
+  public void readStoreFile(Path storeFilePath, long offset, FileSystem fs, 
CacheConfig cacheConf,
+    Configuration conf, BucketCache bucketCache) throws Exception {
+    // Open the file
+    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConf, 
true, conf);
+
+    while (!reader.prefetchComplete()) {
+      // Sleep for a bit
+      Thread.sleep(1000);
+    }
+    HFileBlock block = reader.readBlock(offset, -1, false, true, false, true, 
null, null);
+    BlockCacheKey blockCacheKey = new BlockCacheKey(reader.getName(), offset);
+    BucketEntry be = bucketCache.backingMap.get(blockCacheKey);
+    boolean isCached = bucketCache.getBlock(blockCacheKey, true, false, true) 
!= null;
+
+    if (
+      block.getBlockType() == BlockType.DATA || block.getBlockType() == 
BlockType.ROOT_INDEX
+        || block.getBlockType() == BlockType.INTERMEDIATE_INDEX
+    ) {
+      assertTrue(isCached);
+    }
+  }
+
+  public Path writeStoreFile(String fname, Configuration conf, CacheConfig 
cacheConf, FileSystem fs)
+    throws IOException {
+    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
+    HFileContext meta = new 
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
+    StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
+      .withOutputDir(storeFileParentDir).withFileContext(meta).build();
+    Random rand = ThreadLocalRandom.current();
+    final int rowLen = 32;
+    for (int i = 0; i < NUM_KV; ++i) {
+      byte[] k = RandomKeyValueUtil.randomOrderedKey(rand, i);
+      byte[] v = RandomKeyValueUtil.randomValue(rand);
+      int cfLen = rand.nextInt(k.length - rowLen + 1);
+      KeyValue kv = new KeyValue(k, 0, rowLen, k, rowLen, cfLen, k, rowLen + 
cfLen,
+        k.length - rowLen - cfLen, rand.nextLong(), generateKeyType(rand), v, 
0, v.length);
+      sfw.append(kv);
+    }
+
+    sfw.close();
+    return sfw.getPath();
+  }
+
+  public static KeyValue.Type generateKeyType(Random rand) {
+    if (rand.nextBoolean()) {
+      // Let's make half of KVs puts.
+      return KeyValue.Type.Put;
+    } else {
+      KeyValue.Type keyType = KeyValue.Type.values()[1 + 
rand.nextInt(NUM_VALID_KEY_TYPES)];
+      if (keyType == KeyValue.Type.Minimum || keyType == 
KeyValue.Type.Maximum) {
+        throw new RuntimeException("Generated an invalid key type: " + keyType 
+ ". "
+          + "Probably the layout of KeyValue.Type has changed.");
+      }
+      return keyType;
+    }
+  }
+
+}

Reply via email to