Repository: hbase Updated Branches: refs/heads/branch-1 e1aab356b -> b694b63ed
HBASE-16460 Can't rebuild the BucketAllocator's data structures when BucketCache uses FileIOEngine (Guanghao Zhang) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b694b63e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b694b63e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b694b63e Branch: refs/heads/branch-1 Commit: b694b63ed7ec9275a5ada77739e836e36853de8b Parents: e1aab35 Author: tedyu <yuzhih...@gmail.com> Authored: Mon Sep 5 06:52:03 2016 -0700 Committer: tedyu <yuzhih...@gmail.com> Committed: Mon Sep 5 06:52:03 2016 -0700 ---------------------------------------------------------------------- .../hbase/io/hfile/bucket/BucketAllocator.java | 34 ++++++++++---- .../hbase/io/hfile/bucket/BucketCache.java | 7 +-- .../hadoop/hbase/io/hfile/CacheTestUtils.java | 15 +++++- .../hbase/io/hfile/bucket/TestBucketCache.java | 48 ++++++++++++++++++++ 4 files changed, 90 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/b694b63e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java index fedfd20..4777607 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -350,25 +351,31 @@ public final class BucketAllocator { // we've found. we can only reconfigure each bucket once; if more than once, // we know there's a bug, so we just log the info, throw, and start again... boolean[] reconfigured = new boolean[buckets.length]; - for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) { + int sizeNotMatchedCount = 0; + int insufficientCapacityCount = 0; + Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = map.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next(); long foundOffset = entry.getValue().offset(); int foundLen = entry.getValue().getLength(); int bucketSizeIndex = -1; - for (int i = 0; i < bucketSizes.length; ++i) { - if (foundLen <= bucketSizes[i]) { + for (int i = 0; i < this.bucketSizes.length; ++i) { + if (foundLen <= this.bucketSizes[i]) { bucketSizeIndex = i; break; } } if (bucketSizeIndex == -1) { - throw new BucketAllocatorException( - "Can't match bucket size for the block with size " + foundLen); + sizeNotMatchedCount++; + iterator.remove(); + continue; } int bucketNo = (int) (foundOffset / bucketCapacity); - if (bucketNo < 0 || bucketNo >= buckets.length) - throw new BucketAllocatorException("Can't find bucket " + bucketNo - + ", total buckets=" + buckets.length - + "; did you shrink the cache?"); + if (bucketNo < 0 || bucketNo >= buckets.length) { + insufficientCapacityCount++; + iterator.remove(); + continue; + } Bucket b = buckets[bucketNo]; if (reconfigured[bucketNo]) { if (b.sizeIndex() != bucketSizeIndex) @@ -391,6 +398,15 @@ public final class BucketAllocator { usedSize += buckets[bucketNo].getItemAllocationSize(); bucketSizeInfos[bucketSizeIndex].blockAllocated(b); } + + if (sizeNotMatchedCount > 0) { + LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be rebuilt because " + + "there is no matching bucket size for these blocks"); + } + if (insufficientCapacityCount > 0) { + LOG.warn("There are " + insufficientCapacityCount + " blocks which can't be rebuilt - " + + "did you shrink the cache?"); + } } public String toString() { http://git-wip-us.apache.org/repos/asf/hbase/blob/b694b63e/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index c0a9e17..a321556 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -931,12 +931,13 @@ public class BucketCache implements BlockCache, HeapSize { + ", expected:" + backingMap.getClass().getName()); UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois .readObject(); + ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile = + (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject(); BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes, - backingMap, realCacheSize); - backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois - .readObject(); + backingMapFromFile, realCacheSize); bucketAllocator = allocator; deserialiserMap = deserMap; + backingMap = backingMapFromFile; } finally { if (ois != null) ois.close(); if (fis != null) fis.close(); http://git-wip-us.apache.org/repos/asf/hbase/blob/b694b63e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index 6293b07..e584350 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.util.ChecksumType; +import com.google.common.annotations.VisibleForTesting; + public class CacheTestUtils { private static final boolean includesMemstoreTS = true; @@ -333,7 +335,7 @@ public class CacheTestUtils { } - private static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { + public static HFileBlockPair[] generateHFileBlocks(int blockSize, int numBlocks) { HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks]; Random rand = new Random(); HashSet<String> usedStrings = new HashSet<String>(); @@ -376,8 +378,17 @@ public class CacheTestUtils { return returnedBlocks; } - private static class HFileBlockPair { + @VisibleForTesting + public static class HFileBlockPair { BlockCacheKey blockName; HFileBlock block; + + public BlockCacheKey getBlockName() { + return this.blockName; + } + + public HFileBlock getBlock() { + return this.block; + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/b694b63e/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index e10689b..7aef1d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -29,8 +29,11 @@ import java.util.List; import java.util.Random; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheTestUtils; +import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair; import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; @@ -219,4 +222,49 @@ public class TestBucketCache { assertTrue(cache.getCurrentSize() > 0L); assertTrue("We should have a block!", cache.iterator().hasNext()); } + + @Test + public void testRetrieveFromFile() throws Exception { + HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + Path testDir = TEST_UTIL.getDataTestDir(); + TEST_UTIL.getTestFileSystem().mkdirs(testDir); + + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + + "/bucket.persistence"); + long usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize == 0); + + HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); + // Add blocks + for (HFileBlockPair block : blocks) { + bucketCache.cacheBlock(block.getBlockName(), block.getBlock()); + } + for (HFileBlockPair block : blocks) { + cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); + } + usedSize = bucketCache.getAllocator().getUsedSize(); + assertTrue(usedSize != 0); + // persist cache to file + bucketCache.shutdown(); + + // restore cache from file + bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + + "/bucket.persistence"); + assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + // persist cache to file + bucketCache.shutdown(); + + // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k) + // so it can't restore cache from file + int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 }; + bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + constructedBlockSize, smallBucketSizes, writeThreads, + writerQLen, testDir + "/bucket.persistence"); + assertEquals(0, bucketCache.getAllocator().getUsedSize()); + assertEquals(0, bucketCache.backingMap.size()); + + TEST_UTIL.cleanupTestDir(); + } }