Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 a30f7ddc9 -> d91a28a45


HBASE-16460 Can't rebuild the BucketAllocator's data structures when 
BucketCache uses FileIOEngine (Guanghao Zhang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d91a28a4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d91a28a4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d91a28a4

Branch: refs/heads/branch-1.1
Commit: d91a28a450fc0f697bf78aab07543cd48f7dedfc
Parents: a30f7dd
Author: tedyu <yuzhih...@gmail.com>
Authored: Mon Sep 5 17:29:44 2016 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Mon Sep 5 17:29:44 2016 -0700

----------------------------------------------------------------------
 .../hbase/io/hfile/bucket/BucketAllocator.java  | 34 ++++++++++----
 .../hbase/io/hfile/bucket/BucketCache.java      |  7 +--
 .../hadoop/hbase/io/hfile/CacheTestUtils.java   | 15 +++++-
 .../hbase/io/hfile/bucket/TestBucketCache.java  | 48 ++++++++++++++++++++
 4 files changed, 90 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d91a28a4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
index fb95007..aece6a6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketAllocator.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.io.hfile.bucket;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -341,25 +342,31 @@ public final class BucketAllocator {
     // we've found. we can only reconfigure each bucket once; if more than 
once,
     // we know there's a bug, so we just log the info, throw, and start 
again...
     boolean[] reconfigured = new boolean[buckets.length];
-    for (Map.Entry<BlockCacheKey, BucketEntry> entry : map.entrySet()) {
+    int sizeNotMatchedCount = 0;
+    int insufficientCapacityCount = 0;
+    Iterator<Map.Entry<BlockCacheKey, BucketEntry>> iterator = 
map.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<BlockCacheKey, BucketEntry> entry = iterator.next();
       long foundOffset = entry.getValue().offset();
       int foundLen = entry.getValue().getLength();
       int bucketSizeIndex = -1;
-      for (int i = 0; i < bucketSizes.length; ++i) {
-        if (foundLen <= bucketSizes[i]) {
+      for (int i = 0; i < this.bucketSizes.length; ++i) {
+        if (foundLen <= this.bucketSizes[i]) {
           bucketSizeIndex = i;
           break;
         }
       }
       if (bucketSizeIndex == -1) {
-        throw new BucketAllocatorException(
-            "Can't match bucket size for the block with size " + foundLen);
+        sizeNotMatchedCount++;
+        iterator.remove();
+        continue;
       }
       int bucketNo = (int) (foundOffset / bucketCapacity);
-      if (bucketNo < 0 || bucketNo >= buckets.length)
-        throw new BucketAllocatorException("Can't find bucket " + bucketNo
-            + ", total buckets=" + buckets.length
-            + "; did you shrink the cache?");
+      if (bucketNo < 0 || bucketNo >= buckets.length) {
+        insufficientCapacityCount++;
+        iterator.remove();
+        continue;
+      }
       Bucket b = buckets[bucketNo];
       if (reconfigured[bucketNo]) {
         if (b.sizeIndex() != bucketSizeIndex)
@@ -382,6 +389,15 @@ public final class BucketAllocator {
       usedSize += buckets[bucketNo].getItemAllocationSize();
       bucketSizeInfos[bucketSizeIndex].blockAllocated(b);
     }
+
+    if (sizeNotMatchedCount > 0) {
+      LOG.warn("There are " + sizeNotMatchedCount + " blocks which can't be 
rebuilt because "
+          + "there is no matching bucket size for these blocks");
+    }
+    if (insufficientCapacityCount > 0) {
+      LOG.warn("There are " + insufficientCapacityCount + " blocks which can't 
be rebuilt - "
+          + "did you shrink the cache?");
+    }
   }
 
   public String toString() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d91a28a4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 54b9398..cbd38f4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -938,12 +938,13 @@ public class BucketCache implements BlockCache, HeapSize {
             + ", expected:" + backingMap.getClass().getName());
       UniqueIndexMap<Integer> deserMap = (UniqueIndexMap<Integer>) ois
           .readObject();
+      ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMapFromFile =
+          (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois.readObject();
       BucketAllocator allocator = new BucketAllocator(cacheCapacity, 
bucketSizes,
-          backingMap, realCacheSize);
-      backingMap = (ConcurrentHashMap<BlockCacheKey, BucketEntry>) ois
-          .readObject();
+        backingMapFromFile, realCacheSize);
       bucketAllocator = allocator;
       deserialiserMap = deserMap;
+      backingMap = backingMapFromFile;
     } finally {
       if (ois != null) ois.close();
       if (fis != null) fis.close();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d91a28a4/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
index b0a2ba2..247269d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.util.ChecksumType;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class CacheTestUtils {
 
   private static final boolean includesMemstoreTS = true;
@@ -315,7 +317,7 @@ public class CacheTestUtils {
   }
 
 
-  private static HFileBlockPair[] generateHFileBlocks(int blockSize,
+  public static HFileBlockPair[] generateHFileBlocks(int blockSize,
       int numBlocks) {
     HFileBlockPair[] returnedBlocks = new HFileBlockPair[numBlocks];
     Random rand = new Random();
@@ -367,8 +369,17 @@ public class CacheTestUtils {
     return returnedBlocks;
   }
 
-  private static class HFileBlockPair {
+  @VisibleForTesting
+  public static class HFileBlockPair {
     BlockCacheKey blockName;
     HFileBlock block;
+
+    public BlockCacheKey getBlockName() {
+      return this.blockName;
+    }
+
+    public HFileBlock getBlock() {
+      return this.block;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d91a28a4/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index f004868..91d481f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -28,8 +28,11 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
 import org.apache.hadoop.hbase.io.hfile.CacheTestUtils;
+import org.apache.hadoop.hbase.io.hfile.CacheTestUtils.HFileBlockPair;
 import org.apache.hadoop.hbase.io.hfile.Cacheable;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
@@ -218,4 +221,49 @@ public class TestBucketCache {
     assertTrue(cache.getCurrentSize() > 0L);
     assertTrue("We should have a block!", cache.iterator().hasNext());
   }
+
+  @Test
+  public void testRetrieveFromFile() throws Exception {
+    HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+    Path testDir = TEST_UTIL.getDataTestDir();
+    TEST_UTIL.getTestFileSystem().mkdirs(testDir);
+
+    BucketCache bucketCache = new BucketCache("file:" + testDir + 
"/bucket.cache", capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
testDir
+            + "/bucket.persistence");
+    long usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize == 0);
+
+    HFileBlockPair[] blocks = 
CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1);
+    // Add blocks
+    for (HFileBlockPair block : blocks) {
+      bucketCache.cacheBlock(block.getBlockName(), block.getBlock());
+    }
+    for (HFileBlockPair block : blocks) {
+      cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), 
block.getBlock());
+    }
+    usedSize = bucketCache.getAllocator().getUsedSize();
+    assertTrue(usedSize != 0);
+    // persist cache to file
+    bucketCache.shutdown();
+
+    // restore cache from file
+    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, 
testDir
+            + "/bucket.persistence");
+    assertEquals(usedSize, bucketCache.getAllocator().getUsedSize());
+    // persist cache to file
+    bucketCache.shutdown();
+
+    // reconfig buckets sizes, the biggest bucket is small than 
constructedBlockSize (8k or 16k)
+    // so it can't restore cache from file
+    int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 };
+    bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", 
capacitySize,
+        constructedBlockSize, smallBucketSizes, writeThreads,
+        writerQLen, testDir + "/bucket.persistence");
+    assertEquals(0, bucketCache.getAllocator().getUsedSize());
+    assertEquals(0, bucketCache.backingMap.size());
+
+    TEST_UTIL.cleanupTestDir();
+  }
 }

Reply via email to