This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 44cb5009045 HBASE-28805: Chunked persistence of backing map for 
persistent bucket cache. (#6183)
44cb5009045 is described below

commit 44cb5009045b3291532c64c19efb1a8750074725
Author: jhungund <[email protected]>
AuthorDate: Tue Sep 3 21:18:14 2024 +0530

    HBASE-28805: Chunked persistence of backing map for persistent bucket 
cache. (#6183)
    
    Signed-off-by: Wellington Chevreuil <[email protected]>
    (cherry picked from commit 84655ded27bbd6d8aada9707d06f23c65d2c4746)
---
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  | 190 ++++++++++++++++-----
 .../hbase/io/hfile/bucket/BucketProtoUtils.java    |  44 ++++-
 .../io/hfile/bucket/TestVerifyBucketCacheFile.java |  47 +++++
 3 files changed, 226 insertions(+), 55 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 5816b8ff160..10d0c925a47 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -25,6 +25,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -122,6 +123,8 @@ public class BucketCache implements BlockCache, HeapSize {
   static final String EXTRA_FREE_FACTOR_CONFIG_NAME = 
"hbase.bucketcache.extrafreefactor";
   static final String ACCEPT_FACTOR_CONFIG_NAME = 
"hbase.bucketcache.acceptfactor";
   static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
+  static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE =
+    "hbase.bucketcache.persistence.chunksize";
 
   /** Use strong reference for offsetLock or not */
   private static final String STRONG_REF_KEY = 
"hbase.bucketcache.offsetlock.usestrongref";
@@ -145,6 +148,8 @@ public class BucketCache implements BlockCache, HeapSize {
   final static int DEFAULT_WRITER_THREADS = 3;
   final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
 
+  final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000000;
+
   // Store/read block data
   transient final IOEngine ioEngine;
 
@@ -273,6 +278,8 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   private String algorithm;
 
+  private long persistenceChunkSize;
+
   /* Tracing failed Bucket Cache allocations. */
   private long allocFailLogPrevTs; // time of previous log event for 
allocation failure.
   private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default 
1 minute.
@@ -313,6 +320,11 @@ public class BucketCache implements BlockCache, HeapSize {
     this.queueAdditionWaitTime =
       conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
     this.bucketcachePersistInterval = 
conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
+    this.persistenceChunkSize =
+      conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, 
DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE);
+    if (this.persistenceChunkSize <= 0) {
+      persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
+    }
 
     sanityCheckConfigs();
 
@@ -1358,8 +1370,8 @@ public class BucketCache implements BlockCache, HeapSize {
     }
     File tempPersistencePath = new File(persistencePath + 
EnvironmentEdgeManager.currentTime());
     try (FileOutputStream fos = new FileOutputStream(tempPersistencePath, 
false)) {
-      fos.write(ProtobufMagic.PB_MAGIC);
-      BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
+      LOG.debug("Persist in new chunked persistence format.");
+      persistChunkedBackingMap(fos);
     } catch (IOException e) {
       LOG.error("Failed to persist bucket cache to file", e);
       throw e;
@@ -1405,16 +1417,23 @@ public class BucketCache implements BlockCache, 
HeapSize {
         throw new IOException("Incorrect number of bytes read while checking 
for protobuf magic "
           + "number. Requested=" + pblen + ", Received= " + read + ", File=" + 
persistencePath);
       }
-      if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {
+      if (ProtobufMagic.isPBMagicPrefix(pbuf)) {
+        LOG.info("Reading old format of persistence.");
+        // The old non-chunked version of backing map persistence.
+        parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
+      } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) {
+        // The new persistence format of chunked persistence.
+        LOG.info("Reading new chunked format of persistence.");
+        retrieveChunkedBackingMap(in, bucketSizes);
+      } else {
         // In 3.0 we have enough flexibility to dump the old cache data.
         // TODO: In 2.x line, this might need to be filled in to support 
reading the old format
         throw new IOException(
           "Persistence file does not start with protobuf magic number. " + 
persistencePath);
       }
-      parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
       bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, 
backingMap, realCacheSize);
       blockNumber.add(backingMap.size());
-      LOG.info("Bucket cache retrieved from file successfully");
+      LOG.info("Bucket cache retrieved from file successfully with size: {}", 
backingMap.size());
     }
   }
 
@@ -1457,6 +1476,75 @@ public class BucketCache implements BlockCache, HeapSize 
{
     }
   }
 
+  private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) {
+    try {
+      if (proto.hasChecksum()) {
+        ((PersistentIOEngine) 
ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
+          algorithm);
+      }
+      backingMapValidated.set(true);
+    } catch (IOException e) {
+      LOG.warn("Checksum for cache file failed. "
+        + "We need to validate each cache key in the backing map. "
+        + "This may take some time, so we'll do it in a background thread,");
+
+      Runnable cacheValidator = () -> {
+        while (bucketAllocator == null) {
+          try {
+            Thread.sleep(50);
+          } catch (InterruptedException ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+        long startTime = EnvironmentEdgeManager.currentTime();
+        int totalKeysOriginally = backingMap.size();
+        for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : 
backingMap.entrySet()) {
+          try {
+            ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
+          } catch (IOException e1) {
+            LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
+            evictBlock(keyEntry.getKey());
+            fileNotFullyCached(keyEntry.getKey().getHfileName());
+          }
+        }
+        backingMapValidated.set(true);
+        LOG.info("Finished validating {} keys in the backing map. Recovered: 
{}. This took {}ms.",
+          totalKeysOriginally, backingMap.size(),
+          (EnvironmentEdgeManager.currentTime() - startTime));
+      };
+      Thread t = new Thread(cacheValidator);
+      t.setDaemon(true);
+      t.start();
+    }
+  }
+
+  private void parsePB(BucketCacheProtos.BucketCacheEntry firstChunk,
+    List<BucketCacheProtos.BackingMap> chunks) throws IOException {
+    fullyCachedFiles.clear();
+    Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, 
NavigableSet<BlockCacheKey>> pair =
+      BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), 
firstChunk.getBackingMap(),
+        this::createRecycler);
+    backingMap.putAll(pair.getFirst());
+    blocksByHFile.addAll(pair.getSecond());
+    
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(firstChunk.getCachedFilesMap()));
+
+    LOG.debug("Number of blocks after first chunk: {}, blocksByHFile: {}", 
backingMap.size(),
+      fullyCachedFiles.size());
+    int i = 1;
+    for (BucketCacheProtos.BackingMap chunk : chunks) {
+      Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, 
NavigableSet<BlockCacheKey>> pair2 =
+        BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), chunk, 
this::createRecycler);
+      backingMap.putAll(pair2.getFirst());
+      blocksByHFile.addAll(pair2.getSecond());
+      LOG.debug("Number of blocks after {} reading chunk: {}, blocksByHFile: 
{}", ++i,
+        backingMap.size(), fullyCachedFiles.size());
+    }
+    verifyFileIntegrity(firstChunk);
+    verifyCapacityAndClasses(firstChunk.getCacheCapacity(), 
firstChunk.getIoClass(),
+      firstChunk.getMapClass());
+    updateRegionSizeMapWhileRetrievingFromFile();
+  }
+
   private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws 
IOException {
     Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>, 
NavigableSet<BlockCacheKey>> pair =
       BucketProtoUtils.fromPB(proto.getDeserializersMap(), 
proto.getBackingMap(),
@@ -1465,52 +1553,60 @@ public class BucketCache implements BlockCache, 
HeapSize {
     blocksByHFile = pair.getSecond();
     fullyCachedFiles.clear();
     
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap()));
-    if (proto.hasChecksum()) {
-      try {
-        ((PersistentIOEngine) 
ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
-          algorithm);
-        backingMapValidated.set(true);
-      } catch (IOException e) {
-        LOG.warn("Checksum for cache file failed. "
-          + "We need to validate each cache key in the backing map. "
-          + "This may take some time, so we'll do it in a background thread,");
-        Runnable cacheValidator = () -> {
-          while (bucketAllocator == null) {
-            try {
-              Thread.sleep(50);
-            } catch (InterruptedException ex) {
-              throw new RuntimeException(ex);
-            }
-          }
-          long startTime = EnvironmentEdgeManager.currentTime();
-          int totalKeysOriginally = backingMap.size();
-          for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry : 
backingMap.entrySet()) {
-            try {
-              ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
-            } catch (IOException e1) {
-              LOG.debug("Check for key {} failed. Evicting.", 
keyEntry.getKey());
-              evictBlock(keyEntry.getKey());
-              fullyCachedFiles.remove(keyEntry.getKey().getHfileName());
-            }
-          }
-          backingMapValidated.set(true);
-          LOG.info("Finished validating {} keys in the backing map. Recovered: 
{}. This took {}ms.",
-            totalKeysOriginally, backingMap.size(),
-            (EnvironmentEdgeManager.currentTime() - startTime));
-        };
-        Thread t = new Thread(cacheValidator);
-        t.setDaemon(true);
-        t.start();
-      }
-    } else {
-      // if has not checksum, it means the persistence file is old format
-      LOG.info("Persistent file is old format, it does not support verifying 
file integrity!");
-      backingMapValidated.set(true);
-    }
+    verifyFileIntegrity(proto);
     updateRegionSizeMapWhileRetrievingFromFile();
     verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), 
proto.getMapClass());
   }
 
+  private void persistChunkedBackingMap(FileOutputStream fos) throws 
IOException {
+    long numChunks = backingMap.size() / persistenceChunkSize;
+    if (backingMap.size() % persistenceChunkSize != 0) {
+      numChunks += 1;
+    }
+
+    LOG.debug(
+      "persistToFile: before persisting backing map size: {}, "
+        + "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}",
+      backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize, 
numChunks);
+
+    BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize, numChunks);
+
+    LOG.debug(
+      "persistToFile: after persisting backing map size: {}, "
+        + "fullycachedFiles size: {}, numChunksPersisteed: {}",
+      backingMap.size(), fullyCachedFiles.size(), numChunks);
+  }
+
+  private void retrieveChunkedBackingMap(FileInputStream in, int[] 
bucketSizes) throws IOException {
+    byte[] bytes = new byte[Long.BYTES];
+    int readSize = in.read(bytes);
+    if (readSize != Long.BYTES) {
+      throw new IOException("Invalid size of chunk-size read from persistence: 
" + readSize);
+    }
+    long batchSize = Bytes.toLong(bytes, 0);
+
+    readSize = in.read(bytes);
+    if (readSize != Long.BYTES) {
+      throw new IOException("Invalid size for number of chunks read from 
persistence: " + readSize);
+    }
+    long numChunks = Bytes.toLong(bytes, 0);
+
+    LOG.info("Number of chunks: {}, chunk size: {}", numChunks, batchSize);
+
+    ArrayList<BucketCacheProtos.BackingMap> bucketCacheMaps = new 
ArrayList<>();
+    // Read the first chunk that has all the details.
+    BucketCacheProtos.BucketCacheEntry firstChunk =
+      BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in);
+
+    // Subsequent chunks have the backingMap entries.
+    for (int i = 1; i < numChunks; i++) {
+      LOG.info("Reading chunk no: {}", i + 1);
+      bucketCacheMaps.add(BucketCacheProtos.BackingMap.parseDelimitedFrom(in));
+      LOG.info("Retrieved chunk: {}", i + 1);
+    }
+    parsePB(firstChunk, bucketCacheMaps);
+  }
+
   /**
    * Check whether we tolerate IO error this time. If the duration of IOEngine 
throwing errors
    * exceeds ioErrorsDurationTimeTolerated, we will disable the cache
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
index 4b42414fb9c..4618200325c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockPriority;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
 import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -41,29 +43,55 @@ import 
org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
 
 @InterfaceAudience.Private
 final class BucketProtoUtils {
+
+  final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' };
+
   private BucketProtoUtils() {
 
   }
 
-  static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) {
+  static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache,
+    BucketCacheProtos.BackingMap backingMap) {
     return 
BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize())
       .setIoClass(cache.ioEngine.getClass().getName())
       .setMapClass(cache.backingMap.getClass().getName())
       .putAllDeserializers(CacheableDeserializerIdManager.save())
-      .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles))
-      .setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
+      
.putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)).setBackingMap(backingMap)
       .setChecksum(ByteString
         .copyFrom(((PersistentIOEngine) 
cache.ioEngine).calculateChecksum(cache.getAlgorithm())))
       .build();
   }
 
-  private static BucketCacheProtos.BackingMap toPB(Map<BlockCacheKey, 
BucketEntry> backingMap) {
+  public static void serializeAsPB(BucketCache cache, FileOutputStream fos, 
long chunkSize,
+    long numChunks) throws IOException {
+    int blockCount = 0;
+    int chunkCount = 0;
+    int backingMapSize = cache.backingMap.size();
     BucketCacheProtos.BackingMap.Builder builder = 
BucketCacheProtos.BackingMap.newBuilder();
-    for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
-      
builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder().setKey(toPB(entry.getKey()))
-        .setValue(toPB(entry.getValue())).build());
+
+    fos.write(PB_MAGIC_V2);
+    fos.write(Bytes.toBytes(chunkSize));
+    fos.write(Bytes.toBytes(numChunks));
+
+    for (Map.Entry<BlockCacheKey, BucketEntry> entry : 
cache.backingMap.entrySet()) {
+      blockCount++;
+      builder.addEntry(
+        
BucketCacheProtos.BackingMapEntry.newBuilder().setKey(BucketProtoUtils.toPB(entry.getKey()))
+          .setValue(BucketProtoUtils.toPB(entry.getValue())).build());
+      if (blockCount % chunkSize == 0 || (blockCount == backingMapSize)) {
+        chunkCount++;
+        if (chunkCount == 1) {
+          // Persist all details along with the first chunk into 
BucketCacheEntry
+          BucketProtoUtils.toPB(cache, builder.build()).writeDelimitedTo(fos);
+        } else {
+          // Directly persist subsequent backing-map chunks.
+          builder.build().writeDelimitedTo(fos);
+        }
+        if (blockCount < backingMapSize) {
+          builder = BucketCacheProtos.BackingMap.newBuilder();
+        }
+      }
     }
-    return builder.build();
   }
 
   private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
index 0d33fb079bc..b49a2b1db8d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
+import static 
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
 import static 
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -350,6 +351,52 @@ public class TestVerifyBucketCacheFile {
     TEST_UTIL.cleanupTestDir();
   }
 
+  @Test
+  public void testSingleChunk() throws Exception {
+    testChunkedBackingMapRecovery(5, 5);
+  }
+
+  @Test
+  public void testMultipleChunks() throws Exception {
+    testChunkedBackingMapRecovery(5, 10);
+  }
+
+  private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks) 
throws Exception {
+    HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+    Path testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+    Configuration conf = HBaseConfiguration.create();
+    conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);
+
+    String mapFileName = testDir + "/bucket.persistence" + 
EnvironmentEdgeManager.currentTime();
+    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
+      DEFAULT_ERROR_TOLERATION_DURATION, conf);
+
+    CacheTestUtils.HFileBlockPair[] blocks =
+      CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
+
+    for (int i = 0; i < numBlocks; i++) {
+      cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(), 
blocks[i].getBlock());
+    }
+
+    // saves the current state
+    bucketCache.persistToFile();
+
+    // Create a new bucket which reads from persistence file.
+    BucketCache newBucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+      constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
mapFileName,
+      DEFAULT_ERROR_TOLERATION_DURATION, conf);
+
+    assertEquals(numBlocks, newBucketCache.backingMap.size());
+
+    for (int i = 0; i < numBlocks; i++) {
+      assertEquals(blocks[i].getBlock(),
+        newBucketCache.getBlock(blocks[i].getBlockName(), false, false, 
false));
+    }
+    TEST_UTIL.cleanupTestDir();
+  }
+
   private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey 
cacheKey)
     throws InterruptedException {
     while (!cache.backingMap.containsKey(cacheKey) || 
cache.ramCache.containsKey(cacheKey)) {

Reply via email to