This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new 44cb5009045 HBASE-28805: Chunked persistence of backing map for
persistent bucket cache. (#6183)
44cb5009045 is described below
commit 44cb5009045b3291532c64c19efb1a8750074725
Author: jhungund <[email protected]>
AuthorDate: Tue Sep 3 21:18:14 2024 +0530
HBASE-28805: Chunked persistence of backing map for persistent bucket
cache. (#6183)
Signed-off-by: Wellington Chevreuil <[email protected]>
(cherry picked from commit 84655ded27bbd6d8aada9707d06f23c65d2c4746)
---
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 190 ++++++++++++++++-----
.../hbase/io/hfile/bucket/BucketProtoUtils.java | 44 ++++-
.../io/hfile/bucket/TestVerifyBucketCacheFile.java | 47 +++++
3 files changed, 226 insertions(+), 55 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 5816b8ff160..10d0c925a47 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -25,6 +25,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -122,6 +123,8 @@ public class BucketCache implements BlockCache, HeapSize {
static final String EXTRA_FREE_FACTOR_CONFIG_NAME =
"hbase.bucketcache.extrafreefactor";
static final String ACCEPT_FACTOR_CONFIG_NAME =
"hbase.bucketcache.acceptfactor";
static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor";
+ static final String BACKING_MAP_PERSISTENCE_CHUNK_SIZE =
+ "hbase.bucketcache.persistence.chunksize";
/** Use strong reference for offsetLock or not */
private static final String STRONG_REF_KEY =
"hbase.bucketcache.offsetlock.usestrongref";
@@ -145,6 +148,8 @@ public class BucketCache implements BlockCache, HeapSize {
final static int DEFAULT_WRITER_THREADS = 3;
final static int DEFAULT_WRITER_QUEUE_ITEMS = 64;
+ final static long DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE = 10000000;
+
// Store/read block data
transient final IOEngine ioEngine;
@@ -273,6 +278,8 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private String algorithm;
+ private long persistenceChunkSize;
+
/* Tracing failed Bucket Cache allocations. */
private long allocFailLogPrevTs; // time of previous log event for
allocation failure.
private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default
1 minute.
@@ -313,6 +320,11 @@ public class BucketCache implements BlockCache, HeapSize {
this.queueAdditionWaitTime =
conf.getLong(QUEUE_ADDITION_WAIT_TIME, DEFAULT_QUEUE_ADDITION_WAIT_TIME);
this.bucketcachePersistInterval =
conf.getLong(BUCKETCACHE_PERSIST_INTERVAL_KEY, 1000);
+ this.persistenceChunkSize =
+ conf.getLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE,
DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE);
+ if (this.persistenceChunkSize <= 0) {
+ persistenceChunkSize = DEFAULT_BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
+ }
sanityCheckConfigs();
@@ -1358,8 +1370,8 @@ public class BucketCache implements BlockCache, HeapSize {
}
File tempPersistencePath = new File(persistencePath +
EnvironmentEdgeManager.currentTime());
try (FileOutputStream fos = new FileOutputStream(tempPersistencePath,
false)) {
- fos.write(ProtobufMagic.PB_MAGIC);
- BucketProtoUtils.toPB(this).writeDelimitedTo(fos);
+ LOG.debug("Persist in new chunked persistence format.");
+ persistChunkedBackingMap(fos);
} catch (IOException e) {
LOG.error("Failed to persist bucket cache to file", e);
throw e;
@@ -1405,16 +1417,23 @@ public class BucketCache implements BlockCache,
HeapSize {
throw new IOException("Incorrect number of bytes read while checking
for protobuf magic "
+ "number. Requested=" + pblen + ", Received= " + read + ", File=" +
persistencePath);
}
- if (!ProtobufMagic.isPBMagicPrefix(pbuf)) {
+ if (ProtobufMagic.isPBMagicPrefix(pbuf)) {
+ LOG.info("Reading old format of persistence.");
+ // The old non-chunked version of backing map persistence.
+ parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
+ } else if (Arrays.equals(pbuf, BucketProtoUtils.PB_MAGIC_V2)) {
+ // The new persistence format of chunked persistence.
+ LOG.info("Reading new chunked format of persistence.");
+ retrieveChunkedBackingMap(in, bucketSizes);
+ } else {
// In 3.0 we have enough flexibility to dump the old cache data.
// TODO: In 2.x line, this might need to be filled in to support
reading the old format
throw new IOException(
"Persistence file does not start with protobuf magic number. " +
persistencePath);
}
- parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in));
bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes,
backingMap, realCacheSize);
blockNumber.add(backingMap.size());
- LOG.info("Bucket cache retrieved from file successfully");
+ LOG.info("Bucket cache retrieved from file successfully with size: {}",
backingMap.size());
}
}
@@ -1457,6 +1476,75 @@ public class BucketCache implements BlockCache, HeapSize
{
}
}
+ private void verifyFileIntegrity(BucketCacheProtos.BucketCacheEntry proto) {
+ try {
+ if (proto.hasChecksum()) {
+ ((PersistentIOEngine)
ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
+ algorithm);
+ }
+ backingMapValidated.set(true);
+ } catch (IOException e) {
+ LOG.warn("Checksum for cache file failed. "
+ + "We need to validate each cache key in the backing map. "
+ + "This may take some time, so we'll do it in a background thread,");
+
+ Runnable cacheValidator = () -> {
+ while (bucketAllocator == null) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ long startTime = EnvironmentEdgeManager.currentTime();
+ int totalKeysOriginally = backingMap.size();
+ for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry :
backingMap.entrySet()) {
+ try {
+ ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
+ } catch (IOException e1) {
+ LOG.debug("Check for key {} failed. Evicting.", keyEntry.getKey());
+ evictBlock(keyEntry.getKey());
+ fileNotFullyCached(keyEntry.getKey().getHfileName());
+ }
+ }
+ backingMapValidated.set(true);
+ LOG.info("Finished validating {} keys in the backing map. Recovered:
{}. This took {}ms.",
+ totalKeysOriginally, backingMap.size(),
+ (EnvironmentEdgeManager.currentTime() - startTime));
+ };
+ Thread t = new Thread(cacheValidator);
+ t.setDaemon(true);
+ t.start();
+ }
+ }
+
+ private void parsePB(BucketCacheProtos.BucketCacheEntry firstChunk,
+ List<BucketCacheProtos.BackingMap> chunks) throws IOException {
+ fullyCachedFiles.clear();
+ Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>,
NavigableSet<BlockCacheKey>> pair =
+ BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(),
firstChunk.getBackingMap(),
+ this::createRecycler);
+ backingMap.putAll(pair.getFirst());
+ blocksByHFile.addAll(pair.getSecond());
+
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(firstChunk.getCachedFilesMap()));
+
+ LOG.debug("Number of blocks after first chunk: {}, blocksByHFile: {}",
backingMap.size(),
+ fullyCachedFiles.size());
+ int i = 1;
+ for (BucketCacheProtos.BackingMap chunk : chunks) {
+ Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>,
NavigableSet<BlockCacheKey>> pair2 =
+ BucketProtoUtils.fromPB(firstChunk.getDeserializersMap(), chunk,
this::createRecycler);
+ backingMap.putAll(pair2.getFirst());
+ blocksByHFile.addAll(pair2.getSecond());
+ LOG.debug("Number of blocks after {} reading chunk: {}, blocksByHFile:
{}", ++i,
+ backingMap.size(), fullyCachedFiles.size());
+ }
+ verifyFileIntegrity(firstChunk);
+ verifyCapacityAndClasses(firstChunk.getCacheCapacity(),
firstChunk.getIoClass(),
+ firstChunk.getMapClass());
+ updateRegionSizeMapWhileRetrievingFromFile();
+ }
+
private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws
IOException {
Pair<ConcurrentHashMap<BlockCacheKey, BucketEntry>,
NavigableSet<BlockCacheKey>> pair =
BucketProtoUtils.fromPB(proto.getDeserializersMap(),
proto.getBackingMap(),
@@ -1465,52 +1553,60 @@ public class BucketCache implements BlockCache,
HeapSize {
blocksByHFile = pair.getSecond();
fullyCachedFiles.clear();
fullyCachedFiles.putAll(BucketProtoUtils.fromPB(proto.getCachedFilesMap()));
- if (proto.hasChecksum()) {
- try {
- ((PersistentIOEngine)
ioEngine).verifyFileIntegrity(proto.getChecksum().toByteArray(),
- algorithm);
- backingMapValidated.set(true);
- } catch (IOException e) {
- LOG.warn("Checksum for cache file failed. "
- + "We need to validate each cache key in the backing map. "
- + "This may take some time, so we'll do it in a background thread,");
- Runnable cacheValidator = () -> {
- while (bucketAllocator == null) {
- try {
- Thread.sleep(50);
- } catch (InterruptedException ex) {
- throw new RuntimeException(ex);
- }
- }
- long startTime = EnvironmentEdgeManager.currentTime();
- int totalKeysOriginally = backingMap.size();
- for (Map.Entry<BlockCacheKey, BucketEntry> keyEntry :
backingMap.entrySet()) {
- try {
- ((FileIOEngine) ioEngine).checkCacheTime(keyEntry.getValue());
- } catch (IOException e1) {
- LOG.debug("Check for key {} failed. Evicting.",
keyEntry.getKey());
- evictBlock(keyEntry.getKey());
- fullyCachedFiles.remove(keyEntry.getKey().getHfileName());
- }
- }
- backingMapValidated.set(true);
- LOG.info("Finished validating {} keys in the backing map. Recovered:
{}. This took {}ms.",
- totalKeysOriginally, backingMap.size(),
- (EnvironmentEdgeManager.currentTime() - startTime));
- };
- Thread t = new Thread(cacheValidator);
- t.setDaemon(true);
- t.start();
- }
- } else {
- // if has not checksum, it means the persistence file is old format
- LOG.info("Persistent file is old format, it does not support verifying
file integrity!");
- backingMapValidated.set(true);
- }
+ verifyFileIntegrity(proto);
updateRegionSizeMapWhileRetrievingFromFile();
verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(),
proto.getMapClass());
}
+ private void persistChunkedBackingMap(FileOutputStream fos) throws
IOException {
+ long numChunks = backingMap.size() / persistenceChunkSize;
+ if (backingMap.size() % persistenceChunkSize != 0) {
+ numChunks += 1;
+ }
+
+ LOG.debug(
+ "persistToFile: before persisting backing map size: {}, "
+ + "fullycachedFiles size: {}, chunkSize: {}, numberofChunks: {}",
+ backingMap.size(), fullyCachedFiles.size(), persistenceChunkSize,
numChunks);
+
+ BucketProtoUtils.serializeAsPB(this, fos, persistenceChunkSize, numChunks);
+
+ LOG.debug(
+ "persistToFile: after persisting backing map size: {}, "
+ + "fullycachedFiles size: {}, numChunksPersisteed: {}",
+ backingMap.size(), fullyCachedFiles.size(), numChunks);
+ }
+
+ private void retrieveChunkedBackingMap(FileInputStream in, int[]
bucketSizes) throws IOException {
+ byte[] bytes = new byte[Long.BYTES];
+ int readSize = in.read(bytes);
+ if (readSize != Long.BYTES) {
+ throw new IOException("Invalid size of chunk-size read from persistence:
" + readSize);
+ }
+ long batchSize = Bytes.toLong(bytes, 0);
+
+ readSize = in.read(bytes);
+ if (readSize != Long.BYTES) {
+ throw new IOException("Invalid size for number of chunks read from
persistence: " + readSize);
+ }
+ long numChunks = Bytes.toLong(bytes, 0);
+
+ LOG.info("Number of chunks: {}, chunk size: {}", numChunks, batchSize);
+
+ ArrayList<BucketCacheProtos.BackingMap> bucketCacheMaps = new
ArrayList<>();
+ // Read the first chunk that has all the details.
+ BucketCacheProtos.BucketCacheEntry firstChunk =
+ BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in);
+
+ // Subsequent chunks have the backingMap entries.
+ for (int i = 1; i < numChunks; i++) {
+ LOG.info("Reading chunk no: {}", i + 1);
+ bucketCacheMaps.add(BucketCacheProtos.BackingMap.parseDelimitedFrom(in));
+ LOG.info("Retrieved chunk: {}", i + 1);
+ }
+ parsePB(firstChunk, bucketCacheMaps);
+ }
+
/**
* Check whether we tolerate IO error this time. If the duration of IOEngine
throwing errors
* exceeds ioErrorsDurationTimeTolerated, we will disable the cache
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
index 4b42414fb9c..4618200325c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.io.hfile.BlockPriority;
import org.apache.hadoop.hbase.io.hfile.BlockType;
import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
@@ -41,29 +43,55 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
@InterfaceAudience.Private
final class BucketProtoUtils {
+
+ final static byte[] PB_MAGIC_V2 = new byte[] { 'V', '2', 'U', 'F' };
+
private BucketProtoUtils() {
}
- static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) {
+ static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache,
+ BucketCacheProtos.BackingMap backingMap) {
return
BucketCacheProtos.BucketCacheEntry.newBuilder().setCacheCapacity(cache.getMaxSize())
.setIoClass(cache.ioEngine.getClass().getName())
.setMapClass(cache.backingMap.getClass().getName())
.putAllDeserializers(CacheableDeserializerIdManager.save())
- .putAllCachedFiles(toCachedPB(cache.fullyCachedFiles))
- .setBackingMap(BucketProtoUtils.toPB(cache.backingMap))
+
.putAllCachedFiles(toCachedPB(cache.fullyCachedFiles)).setBackingMap(backingMap)
.setChecksum(ByteString
.copyFrom(((PersistentIOEngine)
cache.ioEngine).calculateChecksum(cache.getAlgorithm())))
.build();
}
- private static BucketCacheProtos.BackingMap toPB(Map<BlockCacheKey,
BucketEntry> backingMap) {
+ public static void serializeAsPB(BucketCache cache, FileOutputStream fos,
long chunkSize,
+ long numChunks) throws IOException {
+ int blockCount = 0;
+ int chunkCount = 0;
+ int backingMapSize = cache.backingMap.size();
BucketCacheProtos.BackingMap.Builder builder =
BucketCacheProtos.BackingMap.newBuilder();
- for (Map.Entry<BlockCacheKey, BucketEntry> entry : backingMap.entrySet()) {
-
builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder().setKey(toPB(entry.getKey()))
- .setValue(toPB(entry.getValue())).build());
+
+ fos.write(PB_MAGIC_V2);
+ fos.write(Bytes.toBytes(chunkSize));
+ fos.write(Bytes.toBytes(numChunks));
+
+ for (Map.Entry<BlockCacheKey, BucketEntry> entry :
cache.backingMap.entrySet()) {
+ blockCount++;
+ builder.addEntry(
+
BucketCacheProtos.BackingMapEntry.newBuilder().setKey(BucketProtoUtils.toPB(entry.getKey()))
+ .setValue(BucketProtoUtils.toPB(entry.getValue())).build());
+ if (blockCount % chunkSize == 0 || (blockCount == backingMapSize)) {
+ chunkCount++;
+ if (chunkCount == 1) {
+ // Persist all details along with the first chunk into
BucketCacheEntry
+ BucketProtoUtils.toPB(cache, builder.build()).writeDelimitedTo(fos);
+ } else {
+ // Directly persist subsequent backing-map chunks.
+ builder.build().writeDelimitedTo(fos);
+ }
+ if (blockCount < backingMapSize) {
+ builder = BucketCacheProtos.BackingMap.newBuilder();
+ }
+ }
}
- return builder.build();
}
private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
index 0d33fb079bc..b49a2b1db8d 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestVerifyBucketCacheFile.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.io.hfile.bucket;
import static
org.apache.hadoop.hbase.io.hfile.CacheConfig.BUCKETCACHE_PERSIST_INTERVAL_KEY;
+import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BACKING_MAP_PERSISTENCE_CHUNK_SIZE;
import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.DEFAULT_ERROR_TOLERATION_DURATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -350,6 +351,52 @@ public class TestVerifyBucketCacheFile {
TEST_UTIL.cleanupTestDir();
}
+ @Test
+ public void testSingleChunk() throws Exception {
+ testChunkedBackingMapRecovery(5, 5);
+ }
+
+ @Test
+ public void testMultipleChunks() throws Exception {
+ testChunkedBackingMapRecovery(5, 10);
+ }
+
+ private void testChunkedBackingMapRecovery(int chunkSize, int numBlocks)
throws Exception {
+ HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+ Path testDir = TEST_UTIL.getDataTestDir();
+ TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+ Configuration conf = HBaseConfiguration.create();
+ conf.setLong(BACKING_MAP_PERSISTENCE_CHUNK_SIZE, chunkSize);
+
+ String mapFileName = testDir + "/bucket.persistence" +
EnvironmentEdgeManager.currentTime();
+ BucketCache bucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+
+ CacheTestUtils.HFileBlockPair[] blocks =
+ CacheTestUtils.generateHFileBlocks(constructedBlockSize, numBlocks);
+
+ for (int i = 0; i < numBlocks; i++) {
+ cacheAndWaitUntilFlushedToBucket(bucketCache, blocks[i].getBlockName(),
blocks[i].getBlock());
+ }
+
+ // saves the current state
+ bucketCache.persistToFile();
+
+ // Create a new bucket which reads from persistence file.
+ BucketCache newBucketCache = new BucketCache("file:" + testDir +
"/bucket.cache", capacitySize,
+ constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen,
mapFileName,
+ DEFAULT_ERROR_TOLERATION_DURATION, conf);
+
+ assertEquals(numBlocks, newBucketCache.backingMap.size());
+
+ for (int i = 0; i < numBlocks; i++) {
+ assertEquals(blocks[i].getBlock(),
+ newBucketCache.getBlock(blocks[i].getBlockName(), false, false,
false));
+ }
+ TEST_UTIL.cleanupTestDir();
+ }
+
private void waitUntilFlushedToBucket(BucketCache cache, BlockCacheKey
cacheKey)
throws InterruptedException {
while (!cache.backingMap.containsKey(cacheKey) ||
cache.ramCache.containsKey(cacheKey)) {