This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new d2c0350034d HBASE-28923 Prioritize "orphan" blocks for eviction inside
BucketCache.freespace (#6373)
d2c0350034d is described below
commit d2c0350034d4408e7b27fe18bdf60796cd231992
Author: Wellington Ramos Chevreuil <[email protected]>
AuthorDate: Wed Oct 30 10:54:13 2024 +0000
HBASE-28923 Prioritize "orphan" blocks for eviction inside
BucketCache.freespace (#6373)
Signed-off-by: Ankit Singhal <[email protected]>
Reviewed by: Kota-SH <[email protected]>
Reviewed by: Vinayak Hegde <[email protected]>
---
.../apache/hadoop/hbase/io/hfile/BlockCache.java | 7 +-
.../hadoop/hbase/io/hfile/BlockCacheFactory.java | 17 ++-
.../hadoop/hbase/io/hfile/BlockCacheUtil.java | 14 +++
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 118 +++++++++++++++------
.../hbase/io/hfile/bucket/TestBucketCache.java | 92 +++++++++++++---
5 files changed, 196 insertions(+), 52 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
index 775bcb99b62..8431dbf25a8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
@@ -271,9 +271,10 @@ public interface BlockCache extends Iterable<CachedBlock> {
}
/**
- * Wait for the bucket cache to be enabled while server restart
- * @param timeout time to wait for the bucket cache to be enable
- * @return boolean true if the bucket cache is enabled, false otherwise
+ * Wait for the block cache implementation to be completely enabled. Some
block cache
+ * implementations may take longer to initialise, and this initialisation
may be asynchronous.
+ * @param timeout time to wait for the cache to become enabled.
+ * @return boolean true if the cache is enabled, false otherwise.
*/
default boolean waitForCacheInitialization(long timeout) {
return true;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
index 6956d584d92..744a6bbf012 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
@@ -22,11 +22,13 @@ import static
org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_PERSISTENT_PATH_KE
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import java.io.IOException;
+import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -83,7 +85,8 @@ public final class BlockCacheFactory {
private BlockCacheFactory() {
}
- public static BlockCache createBlockCache(Configuration conf) {
+ public static BlockCache createBlockCache(Configuration conf,
+ Map<String, HRegion> onlineRegions) {
FirstLevelBlockCache l1Cache = createFirstLevelCache(conf);
if (l1Cache == null) {
return null;
@@ -96,7 +99,7 @@ public final class BlockCacheFactory {
: new InclusiveCombinedBlockCache(l1Cache, l2CacheInstance);
} else {
// otherwise use the bucket cache.
- BucketCache bucketCache = createBucketCache(conf);
+ BucketCache bucketCache = createBucketCache(conf, onlineRegions);
if (!conf.getBoolean("hbase.bucketcache.combinedcache.enabled", true)) {
// Non combined mode is off from 2.0
LOG.warn(
@@ -106,6 +109,10 @@ public final class BlockCacheFactory {
}
}
+ public static BlockCache createBlockCache(Configuration conf) {
+ return createBlockCache(conf, null);
+ }
+
private static FirstLevelBlockCache createFirstLevelCache(final
Configuration c) {
final long cacheSize = MemorySizeUtil.getOnHeapCacheSize(c);
if (cacheSize < 0) {
@@ -179,7 +186,8 @@ public final class BlockCacheFactory {
}
- private static BucketCache createBucketCache(Configuration c) {
+ private static BucketCache createBucketCache(Configuration c,
+ Map<String, HRegion> onlineRegions) {
// Check for L2. ioengine name must be non-null.
String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null);
if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <=
0) {
@@ -225,7 +233,8 @@ public final class BlockCacheFactory {
BucketCache.DEFAULT_ERROR_TOLERATION_DURATION);
// Bucket cache logs its stats on creation internal to the constructor.
bucketCache = new BucketCache(bucketCacheIOEngineName, bucketCacheSize,
blockSize,
- bucketSizes, writerThreads, writerQueueLen, persistentPath,
ioErrorsTolerationDuration, c);
+ bucketSizes, writerThreads, writerQueueLen, persistentPath,
ioErrorsTolerationDuration, c,
+ onlineRegions);
} catch (IOException ioex) {
LOG.error("Can't instantiate bucket cache", ioex);
throw new RuntimeException(ioex);
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
index 3d4698b0047..7324701efe5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
@@ -21,13 +21,17 @@ import static
org.apache.hadoop.hbase.io.hfile.HFileBlock.FILL_HEADER;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.metrics.impl.FastLongHistogram;
import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.GsonUtil;
@@ -242,6 +246,16 @@ public class BlockCacheUtil {
}
}
+ public static Set<String> listAllFilesNames(Map<String, HRegion>
onlineRegions) {
+ Set<String> files = new HashSet<>();
+ onlineRegions.values().forEach(r -> {
+ r.getStores().forEach(s -> {
+ s.getStorefiles().forEach(f -> files.add(f.getPath().getName()));
+ });
+ });
+ return files;
+ }
+
private static final int DEFAULT_MAX = 1000000;
public static int getMaxCachedBlocksByFile(Configuration conf) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index e7be17ad471..5c65e42e5f8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.RefCnt;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -131,6 +132,12 @@ public class BucketCache implements BlockCache, HeapSize {
private static final String STRONG_REF_KEY =
"hbase.bucketcache.offsetlock.usestrongref";
private static final boolean STRONG_REF_DEFAULT = false;
+ /** The cache age of blocks to check if the related file is present on any
online regions. */
+ static final String BLOCK_ORPHAN_GRACE_PERIOD =
+ "hbase.bucketcache.block.orphan.evictgraceperiod.seconds";
+
+ static final long BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT = 24 * 60 * 60 * 1000L;
+
/** Priority buckets */
static final float DEFAULT_SINGLE_FACTOR = 0.25f;
static final float DEFAULT_MULTI_FACTOR = 0.50f;
@@ -297,6 +304,10 @@ public class BucketCache implements BlockCache, HeapSize {
private long allocFailLogPrevTs; // time of previous log event for
allocation failure.
private static final int ALLOCATION_FAIL_LOG_TIME_PERIOD = 60000; // Default
1 minute.
+ private Map<String, HRegion> onlineRegions;
+
+ private long orphanBlockGracePeriod = 0;
+
public BucketCache(String ioEngineName, long capacity, int blockSize, int[]
bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath) throws
IOException {
this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum,
writerQLen,
@@ -306,6 +317,13 @@ public class BucketCache implements BlockCache, HeapSize {
public BucketCache(String ioEngineName, long capacity, int blockSize, int[]
bucketSizes,
int writerThreadNum, int writerQLen, String persistencePath, int
ioErrorsTolerationDuration,
Configuration conf) throws IOException {
+ this(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum,
writerQLen,
+ persistencePath, ioErrorsTolerationDuration, conf, null);
+ }
+
+ public BucketCache(String ioEngineName, long capacity, int blockSize, int[]
bucketSizes,
+ int writerThreadNum, int writerQLen, String persistencePath, int
ioErrorsTolerationDuration,
+ Configuration conf, Map<String, HRegion> onlineRegions) throws IOException
{
Preconditions.checkArgument(blockSize > 0,
"BucketCache capacity is set to " + blockSize + ", can not be less than
0");
boolean useStrongRef = conf.getBoolean(STRONG_REF_KEY, STRONG_REF_DEFAULT);
@@ -318,6 +336,9 @@ public class BucketCache implements BlockCache, HeapSize {
this.algorithm = conf.get(FILE_VERIFY_ALGORITHM,
DEFAULT_FILE_VERIFY_ALGORITHM);
this.ioEngine = getIOEngineFromName(ioEngineName, capacity,
persistencePath);
this.writerThreads = new WriterThread[writerThreadNum];
+ this.onlineRegions = onlineRegions;
+ this.orphanBlockGracePeriod =
+ conf.getLong(BLOCK_ORPHAN_GRACE_PERIOD,
BLOCK_ORPHAN_GRACE_PERIOD_DEFAULT);
long blockNumCapacity = capacity / blockSize;
if (blockNumCapacity >= Integer.MAX_VALUE) {
// Enough for about 32TB of cache!
@@ -1032,6 +1053,29 @@ public class BucketCache implements BlockCache, HeapSize
{
}
}
+ private long calculateBytesToFree(StringBuilder msgBuffer) {
+ long bytesToFreeWithoutExtra = 0;
+ BucketAllocator.IndexStatistics[] stats =
bucketAllocator.getIndexStatistics();
+ long[] bytesToFreeForBucket = new long[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ bytesToFreeForBucket[i] = 0;
+ long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 -
minFactor));
+ freeGoal = Math.max(freeGoal, 1);
+ if (stats[i].freeCount() < freeGoal) {
+ bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal -
stats[i].freeCount());
+ bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
+ if (msgBuffer != null) {
+ msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() + ")="
+ + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
+ }
+ }
+ }
+ if (msgBuffer != null) {
+ msgBuffer.append("Free for total=" +
StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
+ }
+ return bytesToFreeWithoutExtra;
+ }
+
/**
* Free the space if the used size reaches acceptableSize() or one size
block couldn't be
* allocated. When freeing the space, we use the LRU algorithm and ensure
there must be some
@@ -1048,43 +1092,21 @@ public class BucketCache implements BlockCache,
HeapSize {
}
try {
freeInProgress = true;
- long bytesToFreeWithoutExtra = 0;
- // Calculate free byte for each bucketSizeinfo
StringBuilder msgBuffer = LOG.isDebugEnabled() ? new StringBuilder() :
null;
- BucketAllocator.IndexStatistics[] stats =
bucketAllocator.getIndexStatistics();
- long[] bytesToFreeForBucket = new long[stats.length];
- for (int i = 0; i < stats.length; i++) {
- bytesToFreeForBucket[i] = 0;
- long freeGoal = (long) Math.floor(stats[i].totalCount() * (1 -
minFactor));
- freeGoal = Math.max(freeGoal, 1);
- if (stats[i].freeCount() < freeGoal) {
- bytesToFreeForBucket[i] = stats[i].itemSize() * (freeGoal -
stats[i].freeCount());
- bytesToFreeWithoutExtra += bytesToFreeForBucket[i];
- if (msgBuffer != null) {
- msgBuffer.append("Free for bucketSize(" + stats[i].itemSize() +
")="
- + StringUtils.byteDesc(bytesToFreeForBucket[i]) + ", ");
- }
- }
- }
- if (msgBuffer != null) {
- msgBuffer.append("Free for total=" +
StringUtils.byteDesc(bytesToFreeWithoutExtra) + ", ");
- }
-
+ long bytesToFreeWithoutExtra = calculateBytesToFree(msgBuffer);
if (bytesToFreeWithoutExtra <= 0) {
return;
}
long currentSize = bucketAllocator.getUsedSize();
long totalSize = bucketAllocator.getTotalSize();
if (LOG.isDebugEnabled() && msgBuffer != null) {
- LOG.debug("Free started because \"" + why + "\"; " +
msgBuffer.toString()
- + " of current used=" + StringUtils.byteDesc(currentSize) + ",
actual cacheSize="
+ LOG.debug("Free started because \"" + why + "\"; " + msgBuffer + " of
current used="
+ + StringUtils.byteDesc(currentSize) + ", actual cacheSize="
+ StringUtils.byteDesc(realCacheSize.sum()) + ", total="
+ StringUtils.byteDesc(totalSize));
}
-
long bytesToFreeWithExtra =
(long) Math.floor(bytesToFreeWithoutExtra * (1 + extraFreeFactor));
-
// Instantiate priority buckets
BucketEntryGroup bucketSingle =
new BucketEntryGroup(bytesToFreeWithExtra, blockSize,
getPartitionSize(singleFactor));
@@ -1093,10 +1115,48 @@ public class BucketCache implements BlockCache,
HeapSize {
BucketEntryGroup bucketMemory =
new BucketEntryGroup(bytesToFreeWithExtra, blockSize,
getPartitionSize(memoryFactor));
+ Set<String> allValidFiles = null;
+ // We need the region/stores/files tree, in order to figure out if a
block is "orphan" or not.
+ // See further comments below for more details.
+ if (onlineRegions != null) {
+ allValidFiles = BlockCacheUtil.listAllFilesNames(onlineRegions);
+ }
+ // the cached time is recored in nanos, so we need to convert the grace
period accordingly
+ long orphanGracePeriodNanos = orphanBlockGracePeriod * 1000000;
+ long bytesFreed = 0;
// Scan entire map putting bucket entry into appropriate bucket entry
// group
for (Map.Entry<BlockCacheKey, BucketEntry> bucketEntryWithKey :
backingMap.entrySet()) {
- switch (bucketEntryWithKey.getValue().getPriority()) {
+ BlockCacheKey key = bucketEntryWithKey.getKey();
+ BucketEntry entry = bucketEntryWithKey.getValue();
+ // Under certain conditions, blocks for regions not on the current
region server might
+ // be hanging on the cache. For example, when using the persistent
cache feature, if the
+ // RS crashes, then if not the same regions are assigned back once its
online again, blocks
+ // for the previous online regions would be recovered and stay in the
cache. These would be
+ // "orphan" blocks, as the files these blocks belong to are not in any
of the online
+ // regions.
+ // "Orphan" blocks are a pure waste of cache space and should be
evicted first during
+ // the freespace run.
+ // Compactions and Flushes may cache blocks before its files are
completely written. In
+ // these cases the file won't be found in any of the online regions
stores, but the block
+ // shouldn't be evicted. To avoid this, we defined this
+ // hbase.bucketcache.block.orphan.evictgraceperiod property, to
account for a grace
+ // period (default 24 hours) where a block should be checked if it's
an orphan block.
+ if (
+ allValidFiles != null
+ && entry.getCachedTime() < (System.nanoTime() -
orphanGracePeriodNanos)
+ ) {
+ if (!allValidFiles.contains(key.getHfileName())) {
+ if (evictBucketEntryIfNoRpcReferenced(key, entry)) {
+ // We calculate the freed bytes, but we don't stop if the goal
was reached because
+ // these are orphan blocks anyway, so let's leverage this run of
freeSpace
+ // to get rid of all orphans at once.
+ bytesFreed += entry.getLength();
+ continue;
+ }
+ }
+ }
+ switch (entry.getPriority()) {
case SINGLE: {
bucketSingle.add(bucketEntryWithKey);
break;
@@ -1111,7 +1171,6 @@ public class BucketCache implements BlockCache, HeapSize {
}
}
}
-
PriorityQueue<BucketEntryGroup> bucketQueue =
new PriorityQueue<>(3,
Comparator.comparingLong(BucketEntryGroup::overflow));
@@ -1120,7 +1179,6 @@ public class BucketCache implements BlockCache, HeapSize {
bucketQueue.add(bucketMemory);
int remainingBuckets = bucketQueue.size();
- long bytesFreed = 0;
BucketEntryGroup bucketGroup;
while ((bucketGroup = bucketQueue.poll()) != null) {
@@ -1137,18 +1195,15 @@ public class BucketCache implements BlockCache,
HeapSize {
if (bucketSizesAboveThresholdCount(minFactor) > 0) {
bucketQueue.clear();
remainingBuckets = 3;
-
bucketQueue.add(bucketSingle);
bucketQueue.add(bucketMulti);
bucketQueue.add(bucketMemory);
-
while ((bucketGroup = bucketQueue.poll()) != null) {
long bucketBytesToFree = (bytesToFreeWithExtra - bytesFreed) /
remainingBuckets;
bytesFreed += bucketGroup.free(bucketBytesToFree);
remainingBuckets--;
}
}
-
// Even after the above free we might still need freeing because of the
// De-fragmentation of the buckets (also called Slab Calcification
problem), i.e
// there might be some buckets where the occupancy is very sparse and
thus are not
@@ -1167,7 +1222,6 @@ public class BucketCache implements BlockCache, HeapSize {
+ StringUtils.byteDesc(multi) + ", " + "memory=" +
StringUtils.byteDesc(memory));
}
}
-
} catch (Throwable t) {
LOG.warn("Failed freeing space", t);
} finally {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
index 8d3211cab9c..3af648e6cf4 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java
@@ -17,12 +17,17 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
+import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.ACCEPT_FACTOR_CONFIG_NAME;
+import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.BLOCK_ORPHAN_GRACE_PERIOD;
+import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME;
+import static
org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.MIN_FACTOR_CONFIG_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
@@ -30,6 +35,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -56,6 +63,9 @@ import
org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache;
import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry;
import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -464,7 +474,7 @@ public class TestBucketCache {
BucketCache.DEFAULT_MIN_FACTOR);
Configuration conf = HBaseConfiguration.create();
- conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
+ conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
@@ -500,9 +510,9 @@ public class TestBucketCache {
@Test
public void testValidBucketCacheConfigs() throws IOException {
Configuration conf = HBaseConfiguration.create();
- conf.setFloat(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
- conf.setFloat(BucketCache.MIN_FACTOR_CONFIG_NAME, 0.5f);
- conf.setFloat(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
+ conf.setFloat(ACCEPT_FACTOR_CONFIG_NAME, 0.9f);
+ conf.setFloat(MIN_FACTOR_CONFIG_NAME, 0.5f);
+ conf.setFloat(EXTRA_FREE_FACTOR_CONFIG_NAME, 0.5f);
conf.setFloat(BucketCache.SINGLE_FACTOR_CONFIG_NAME, 0.1f);
conf.setFloat(BucketCache.MULTI_FACTOR_CONFIG_NAME, 0.7f);
conf.setFloat(BucketCache.MEMORY_FACTOR_CONFIG_NAME, 0.2f);
@@ -511,11 +521,10 @@ public class TestBucketCache {
constructedBlockSizes, writeThreads, writerQLen, null, 100, conf);
assertTrue(cache.waitForCacheInitialization(10000));
- assertEquals(BucketCache.ACCEPT_FACTOR_CONFIG_NAME + " failed to
propagate.", 0.9f,
+ assertEquals(ACCEPT_FACTOR_CONFIG_NAME + " failed to propagate.", 0.9f,
cache.getAcceptableFactor(), 0);
- assertEquals(BucketCache.MIN_FACTOR_CONFIG_NAME + " failed to propagate.",
0.5f,
- cache.getMinFactor(), 0);
- assertEquals(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to
propagate.", 0.5f,
+ assertEquals(MIN_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
cache.getMinFactor(), 0);
+ assertEquals(EXTRA_FREE_FACTOR_CONFIG_NAME + " failed to propagate.", 0.5f,
cache.getExtraFreeFactor(), 0);
assertEquals(BucketCache.SINGLE_FACTOR_CONFIG_NAME + " failed to
propagate.", 0.1f,
cache.getSingleFactor(), 0);
@@ -529,8 +538,7 @@ public class TestBucketCache {
public void testInvalidAcceptFactorConfig() throws IOException {
float[] configValues = { -1f, 0.2f, 0.86f, 1.05f };
boolean[] expectedOutcomes = { false, false, true, false };
- Map<String, float[]> configMappings =
- ImmutableMap.of(BucketCache.ACCEPT_FACTOR_CONFIG_NAME, configValues);
+ Map<String, float[]> configMappings =
ImmutableMap.of(ACCEPT_FACTOR_CONFIG_NAME, configValues);
Configuration conf = HBaseConfiguration.create();
checkConfigValues(conf, configMappings, expectedOutcomes);
}
@@ -540,8 +548,7 @@ public class TestBucketCache {
float[] configValues = { -1f, 0f, 0.96f, 1.05f };
// throws due to <0, in expected range, minFactor > acceptableFactor, > 1.0
boolean[] expectedOutcomes = { false, true, false, false };
- Map<String, float[]> configMappings =
- ImmutableMap.of(BucketCache.MIN_FACTOR_CONFIG_NAME, configValues);
+ Map<String, float[]> configMappings =
ImmutableMap.of(MIN_FACTOR_CONFIG_NAME, configValues);
Configuration conf = HBaseConfiguration.create();
checkConfigValues(conf, configMappings, expectedOutcomes);
}
@@ -552,7 +559,7 @@ public class TestBucketCache {
// throws due to <0, in expected range, in expected range, config can be >
1.0
boolean[] expectedOutcomes = { false, true, true, true };
Map<String, float[]> configMappings =
- ImmutableMap.of(BucketCache.EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
+ ImmutableMap.of(EXTRA_FREE_FACTOR_CONFIG_NAME, configValues);
Configuration conf = HBaseConfiguration.create();
checkConfigValues(conf, configMappings, expectedOutcomes);
}
@@ -970,4 +977,63 @@ public class TestBucketCache {
totalBlocksToCheck * constructedBlockSize);
return bucketCache;
}
+
+ @Test
+ public void testEvictOrphansOutOfGracePeriod() throws Exception {
+ BucketCache bucketCache = testEvictOrphans(0);
+ assertEquals(10, bucketCache.getBackingMap().size());
+ assertEquals(0, bucketCache.blocksByHFile.stream()
+ .filter(key ->
key.getHfileName().equals("testEvictOrphans-orphan")).count());
+ }
+
+ @Test
+ public void testEvictOrphansWithinGracePeriod() throws Exception {
+ BucketCache bucketCache = testEvictOrphans(60 * 60 * 1000L);
+ assertEquals(18, bucketCache.getBackingMap().size());
+ assertTrue(bucketCache.blocksByHFile.stream()
+ .filter(key ->
key.getHfileName().equals("testEvictOrphans-orphan")).count() > 0);
+ }
+
+ private BucketCache testEvictOrphans(long orphanEvictionGracePeriod) throws
Exception {
+ Path validFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
"testEvictOrphans-valid");
+ Path orphanFile = new Path(HBASE_TESTING_UTILITY.getDataTestDir(),
"testEvictOrphans-orphan");
+ Map<String, HRegion> onlineRegions = new HashMap<>();
+ List<HStore> stores = new ArrayList<>();
+ Collection<HStoreFile> storeFiles = new ArrayList<>();
+ HRegion mockedRegion = mock(HRegion.class);
+ HStore mockedStore = mock(HStore.class);
+ HStoreFile mockedStoreFile = mock(HStoreFile.class);
+ when(mockedStoreFile.getPath()).thenReturn(validFile);
+ storeFiles.add(mockedStoreFile);
+ when(mockedStore.getStorefiles()).thenReturn(storeFiles);
+ stores.add(mockedStore);
+ when(mockedRegion.getStores()).thenReturn(stores);
+ onlineRegions.put("mocked_region", mockedRegion);
+ HBASE_TESTING_UTILITY.getConfiguration().setDouble(MIN_FACTOR_CONFIG_NAME,
0.99);
+
HBASE_TESTING_UTILITY.getConfiguration().setDouble(ACCEPT_FACTOR_CONFIG_NAME,
1);
+
HBASE_TESTING_UTILITY.getConfiguration().setDouble(EXTRA_FREE_FACTOR_CONFIG_NAME,
0.01);
+ HBASE_TESTING_UTILITY.getConfiguration().setLong(BLOCK_ORPHAN_GRACE_PERIOD,
+ orphanEvictionGracePeriod);
+ BucketCache bucketCache = new BucketCache(ioEngineName,
(constructedBlockSize + 1024) * 21,
+ constructedBlockSize, new int[] { constructedBlockSize + 1024 }, 1, 1,
null, 60 * 1000,
+ HBASE_TESTING_UTILITY.getConfiguration(), onlineRegions);
+ HFileBlockPair[] validBlockPairs =
+ CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10,
validFile);
+ HFileBlockPair[] orphanBlockPairs =
+ CacheTestUtils.generateBlocksForPath(constructedBlockSize, 10,
orphanFile);
+ for (HFileBlockPair pair : validBlockPairs) {
+ bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(),
false, true);
+ }
+ waitUntilAllFlushedToBucket(bucketCache);
+ assertEquals(10, bucketCache.getBackingMap().size());
+ bucketCache.freeSpace("test");
+ assertEquals(10, bucketCache.getBackingMap().size());
+ for (HFileBlockPair pair : orphanBlockPairs) {
+ bucketCache.cacheBlockWithWait(pair.getBlockName(), pair.getBlock(),
false, true);
+ }
+ waitUntilAllFlushedToBucket(bucketCache);
+ assertEquals(20, bucketCache.getBackingMap().size());
+ bucketCache.freeSpace("test");
+ return bucketCache;
+ }
}