This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 8ec93ea HBASE-15560 W-TinyLFU based BlockCache
8ec93ea is described below
commit 8ec93ea193f6765fd2639ce851ef8cac7df3f555
Author: Ben Manes <[email protected]>
AuthorDate: Tue Apr 16 13:22:01 2019 -0700
HBASE-15560 W-TinyLFU based BlockCache
Signed-off-by: Andrew Purtell <[email protected]>
---
hbase-common/src/main/resources/hbase-default.xml | 5 +
.../src/main/resources/supplemental-models.xml | 26 ++
hbase-server/pom.xml | 4 +
.../hadoop/hbase/io/hfile/BlockCacheFactory.java | 33 +-
.../hadoop/hbase/io/hfile/CombinedBlockCache.java | 55 +--
.../hbase/io/hfile/FirstLevelBlockCache.java | 45 +++
.../io/hfile/InclusiveCombinedBlockCache.java | 8 +-
.../hadoop/hbase/io/hfile/LruBlockCache.java | 24 +-
.../hadoop/hbase/io/hfile/TinyLfuBlockCache.java | 417 +++++++++++++++++++++
.../hadoop/hbase/io/hfile/bucket/BucketCache.java | 9 +-
.../hbase/regionserver/HeapMemoryManager.java | 2 +-
.../hbase/io/hfile/TestBlockCacheReporting.java | 5 +-
.../hadoop/hbase/io/hfile/TestCacheConfig.java | 4 +-
.../hfile/TestScannerSelectionUsingKeyRange.java | 9 +-
.../hbase/io/hfile/TestTinyLfuBlockCache.java | 309 +++++++++++++++
hbase-shaded/pom.xml | 4 +
pom.xml | 6 +
17 files changed, 899 insertions(+), 66 deletions(-)
diff --git a/hbase-common/src/main/resources/hbase-default.xml
b/hbase-common/src/main/resources/hbase-default.xml
index 1c3e1a5..10eb47b 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -901,6 +901,11 @@ possible configurations would overwhelm and obscure the
important.
The default thread pool size if parallel-seeking feature
enabled.</description>
</property>
<property>
+ <name>hfile.block.cache.policy</name>
+ <value>LRU</value>
+ <description>The eviction policy for the L1 block cache (LRU or
TinyLFU).</description>
+ </property>
+ <property>
<name>hfile.block.cache.size</name>
<value>0.4</value>
<description>Percentage of maximum heap (-Xmx setting) to allocate to
block cache
diff --git a/hbase-resource-bundle/src/main/resources/supplemental-models.xml
b/hbase-resource-bundle/src/main/resources/supplemental-models.xml
index 103919c..c3ffc3b 100644
--- a/hbase-resource-bundle/src/main/resources/supplemental-models.xml
+++ b/hbase-resource-bundle/src/main/resources/supplemental-models.xml
@@ -1858,4 +1858,30 @@ Copyright (c) 2007-2017 The JRuby project
</licenses>
</project>
</supplement>
+ <supplement>
+ <project>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+ </project>
+ </supplement>
+ <supplement>
+ <project>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ </license>
+ </licenses>
+ </project>
+ </supplement>
</supplementalDataModels>
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index 725890c..106cea5 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -456,6 +456,10 @@
<optional>true</optional>
</dependency>
<dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
+ <dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
index 98b3c4f..4d62992 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheFactory.java
@@ -21,6 +21,7 @@ import static
org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
import java.io.IOException;
+import java.util.concurrent.ForkJoinPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
@@ -42,6 +43,12 @@ public final class BlockCacheFactory {
*/
/**
+ * Configuration key to cache block policy (Lru, TinyLfu).
+ */
+ public static final String BLOCKCACHE_POLICY_KEY =
"hfile.block.cache.policy";
+ public static final String BLOCKCACHE_POLICY_DEFAULT = "LRU";
+
+ /**
* If the chosen ioengine can persist its state across restarts, the path to
the file to persist
* to. This file is NOT the data file. It is a file into which we will
serialize the map of
* what is in the data file. For example, if you pass the following argument
as
@@ -85,16 +92,16 @@ public final class BlockCacheFactory {
}
public static BlockCache createBlockCache(Configuration conf) {
- LruBlockCache onHeapCache = createOnHeapCache(conf);
- if (onHeapCache == null) {
+ FirstLevelBlockCache l1Cache = createFirstLevelCache(conf);
+ if (l1Cache == null) {
return null;
}
boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY,
EXTERNAL_BLOCKCACHE_DEFAULT);
if (useExternal) {
BlockCache l2CacheInstance = createExternalBlockcache(conf);
return l2CacheInstance == null ?
- onHeapCache :
- new InclusiveCombinedBlockCache(onHeapCache, l2CacheInstance);
+ l1Cache :
+ new InclusiveCombinedBlockCache(l1Cache, l2CacheInstance);
} else {
// otherwise use the bucket cache.
BucketCache bucketCache = createBucketCache(conf);
@@ -103,20 +110,26 @@ public final class BlockCacheFactory {
LOG.warn(
"From HBase 2.0 onwards only combined mode of LRU cache and bucket
cache is available");
}
- return bucketCache == null ? onHeapCache : new
CombinedBlockCache(onHeapCache, bucketCache);
+ return bucketCache == null ? l1Cache : new CombinedBlockCache(l1Cache,
bucketCache);
}
}
- private static LruBlockCache createOnHeapCache(final Configuration c) {
+ private static FirstLevelBlockCache createFirstLevelCache(final
Configuration c) {
final long cacheSize = MemorySizeUtil.getOnHeapCacheSize(c);
if (cacheSize < 0) {
return null;
}
+ String policy = c.get(BLOCKCACHE_POLICY_KEY, BLOCKCACHE_POLICY_DEFAULT);
int blockSize = c.getInt(BLOCKCACHE_BLOCKSIZE_KEY,
HConstants.DEFAULT_BLOCKSIZE);
- LOG.info(
- "Allocating onheap LruBlockCache size=" +
StringUtils.byteDesc(cacheSize) + ", blockSize="
- + StringUtils.byteDesc(blockSize));
- return new LruBlockCache(cacheSize, blockSize, true, c);
+ LOG.info("Allocating BlockCache size=" +
+ StringUtils.byteDesc(cacheSize) + ", blockSize=" +
StringUtils.byteDesc(blockSize));
+ if (policy.equalsIgnoreCase("LRU")) {
+ return new LruBlockCache(cacheSize, blockSize, true, c);
+ } else if (policy.equalsIgnoreCase("TinyLFU")) {
+ return new TinyLfuBlockCache(cacheSize, blockSize,
ForkJoinPool.commonPool(), c);
+ } else {
+ throw new IllegalArgumentException("Unknown policy: " + policy);
+ }
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index b7b9c77..e5e57f5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -30,22 +30,23 @@ import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
/**
* CombinedBlockCache is an abstraction layer that combines
- * {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used
+ * {@link FirstLevelBlockCache} and {@link BucketCache}. The smaller lruCache
is used
* to cache bloom blocks and index blocks. The larger Cache is used to
* cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean,
boolean)} reads
- * first from the smaller lruCache before looking for the block in the l2Cache.
+ * first from the smaller l1Cache before looking for the block in the l2Cache.
Blocks evicted
+ * from l1Cache are put into the bucket cache.
* Metrics are the combined size and hits and misses of both caches.
*/
@InterfaceAudience.Private
public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
- protected final LruBlockCache onHeapCache;
+ protected final FirstLevelBlockCache l1Cache;
protected final BlockCache l2Cache;
protected final CombinedCacheStats combinedCacheStats;
- public CombinedBlockCache(LruBlockCache onHeapCache, BlockCache l2Cache) {
- this.onHeapCache = onHeapCache;
+ public CombinedBlockCache(FirstLevelBlockCache l1Cache, BlockCache l2Cache) {
+ this.l1Cache = l1Cache;
this.l2Cache = l2Cache;
- this.combinedCacheStats = new CombinedCacheStats(onHeapCache.getStats(),
+ this.combinedCacheStats = new CombinedCacheStats(l1Cache.getStats(),
l2Cache.getStats());
}
@@ -55,14 +56,14 @@ public class CombinedBlockCache implements
ResizableBlockCache, HeapSize {
if (l2Cache instanceof HeapSize) {
l2size = ((HeapSize) l2Cache).heapSize();
}
- return onHeapCache.heapSize() + l2size;
+ return l1Cache.heapSize() + l2size;
}
@Override
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean
inMemory) {
boolean metaBlock = buf.getBlockType().getCategory() != BlockCategory.DATA;
if (metaBlock) {
- onHeapCache.cacheBlock(cacheKey, buf, inMemory);
+ l1Cache.cacheBlock(cacheKey, buf, inMemory);
} else {
l2Cache.cacheBlock(cacheKey, buf, inMemory);
}
@@ -80,19 +81,19 @@ public class CombinedBlockCache implements
ResizableBlockCache, HeapSize {
// we end up calling l2Cache.getBlock.
// We are not in a position to exactly look at LRU cache or BC as
BlockType may not be getting
// passed always.
- return onHeapCache.containsBlock(cacheKey)?
- onHeapCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics):
+ return l1Cache.containsBlock(cacheKey)?
+ l1Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics):
l2Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
}
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
- return onHeapCache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey);
+ return l1Cache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey);
}
@Override
public int evictBlocksByHfileName(String hfileName) {
- return onHeapCache.evictBlocksByHfileName(hfileName)
+ return l1Cache.evictBlocksByHfileName(hfileName)
+ l2Cache.evictBlocksByHfileName(hfileName);
}
@@ -103,43 +104,43 @@ public class CombinedBlockCache implements
ResizableBlockCache, HeapSize {
@Override
public void shutdown() {
- onHeapCache.shutdown();
+ l1Cache.shutdown();
l2Cache.shutdown();
}
@Override
public long size() {
- return onHeapCache.size() + l2Cache.size();
+ return l1Cache.size() + l2Cache.size();
}
@Override
public long getMaxSize() {
- return onHeapCache.getMaxSize() + l2Cache.getMaxSize();
+ return l1Cache.getMaxSize() + l2Cache.getMaxSize();
}
@Override
public long getCurrentDataSize() {
- return onHeapCache.getCurrentDataSize() + l2Cache.getCurrentDataSize();
+ return l1Cache.getCurrentDataSize() + l2Cache.getCurrentDataSize();
}
@Override
public long getFreeSize() {
- return onHeapCache.getFreeSize() + l2Cache.getFreeSize();
+ return l1Cache.getFreeSize() + l2Cache.getFreeSize();
}
@Override
public long getCurrentSize() {
- return onHeapCache.getCurrentSize() + l2Cache.getCurrentSize();
+ return l1Cache.getCurrentSize() + l2Cache.getCurrentSize();
}
@Override
public long getBlockCount() {
- return onHeapCache.getBlockCount() + l2Cache.getBlockCount();
+ return l1Cache.getBlockCount() + l2Cache.getBlockCount();
}
@Override
public long getDataBlockCount() {
- return onHeapCache.getDataBlockCount() + l2Cache.getDataBlockCount();
+ return l1Cache.getDataBlockCount() + l2Cache.getDataBlockCount();
}
public static class CombinedCacheStats extends CacheStats {
@@ -332,7 +333,7 @@ public class CombinedBlockCache implements
ResizableBlockCache, HeapSize {
lruCacheStats.rollMetricsPeriod();
bucketCacheStats.rollMetricsPeriod();
}
-
+
@Override
public long getFailedInserts() {
return lruCacheStats.getFailedInserts() +
bucketCacheStats.getFailedInserts();
@@ -343,13 +344,13 @@ public class CombinedBlockCache implements
ResizableBlockCache, HeapSize {
return lruCacheStats.getSumHitCountsPastNPeriods()
+ bucketCacheStats.getSumHitCountsPastNPeriods();
}
-
+
@Override
public long getSumRequestCountsPastNPeriods() {
return lruCacheStats.getSumRequestCountsPastNPeriods()
+ bucketCacheStats.getSumRequestCountsPastNPeriods();
}
-
+
@Override
public long getSumHitCachingCountsPastNPeriods() {
return lruCacheStats.getSumHitCachingCountsPastNPeriods()
@@ -370,12 +371,12 @@ public class CombinedBlockCache implements
ResizableBlockCache, HeapSize {
@Override
public BlockCache[] getBlockCaches() {
- return new BlockCache [] {this.onHeapCache, this.l2Cache};
+ return new BlockCache [] {this.l1Cache, this.l2Cache};
}
@Override
public void setMaxSize(long size) {
- this.onHeapCache.setMaxSize(size);
+ this.l1Cache.setMaxSize(size);
}
@Override
@@ -390,7 +391,7 @@ public class CombinedBlockCache implements
ResizableBlockCache, HeapSize {
? ((BucketCache) this.l2Cache).getRefCount(cacheKey) : 0;
}
- public LruBlockCache getOnHeapCache() {
- return onHeapCache;
+ public FirstLevelBlockCache getFirstLevelCache() {
+ return l1Cache;
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java
new file mode 100644
index 0000000..a0c34c9
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FirstLevelBlockCache.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * In-memory BlockCache that may be backed by secondary layer(s).
+ */
[email protected]
+public interface FirstLevelBlockCache extends ResizableBlockCache, HeapSize {
+
+ /**
+ * Whether the cache contains the block with specified cacheKey
+ *
+ * @param cacheKey cache key for the block
+ * @return true if it contains the block
+ */
+ boolean containsBlock(BlockCacheKey cacheKey);
+
+ /**
+ * Specifies the secondary cache. An entry that is evicted from this cache
due to a size
+ * constraint will be inserted into the victim cache.
+ *
+ * @param victimCache the second level cache
+ * @throws IllegalArgumentException if the victim cache had already been set
+ */
+ void setVictimCache(BlockCache victimCache);
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java
index 823576f..8b85c68 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java
@@ -23,7 +23,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class InclusiveCombinedBlockCache extends CombinedBlockCache {
- public InclusiveCombinedBlockCache(LruBlockCache l1, BlockCache l2) {
+ public InclusiveCombinedBlockCache(FirstLevelBlockCache l1, BlockCache l2) {
super(l1,l2);
l1.setVictimCache(l2);
}
@@ -34,7 +34,7 @@ public class InclusiveCombinedBlockCache extends
CombinedBlockCache {
// On all external cache set ups the lru should have the l2 cache set as
the victimHandler
// Because of that all requests that miss inside of the lru block cache
will be
// tried in the l2 block cache.
- return onHeapCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
+ return l1Cache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
}
/**
@@ -48,7 +48,7 @@ public class InclusiveCombinedBlockCache extends
CombinedBlockCache {
public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean
inMemory) {
// This is the inclusive part of the combined block cache.
// Every block is placed into both block caches.
- onHeapCache.cacheBlock(cacheKey, buf, inMemory);
+ l1Cache.cacheBlock(cacheKey, buf, inMemory);
// This assumes that insertion into the L2 block cache is either async or
very fast.
l2Cache.cacheBlock(cacheKey, buf, inMemory);
@@ -56,7 +56,7 @@ public class InclusiveCombinedBlockCache extends
CombinedBlockCache {
@Override
public boolean evictBlock(BlockCacheKey cacheKey) {
- boolean l1Result = this.onHeapCache.evictBlock(cacheKey);
+ boolean l1Result = this.l1Cache.evictBlock(cacheKey);
boolean l2Result = this.l2Cache.evictBlock(cacheKey);
return l1Result || l2Result;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index c21935f..ecbf37c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.io.hfile;
+import static java.util.Objects.requireNonNull;
+
import java.lang.ref.WeakReference;
import java.util.EnumMap;
import java.util.Iterator;
@@ -91,7 +93,7 @@ import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFacto
* to the relative sizes and usage.
*/
@InterfaceAudience.Private
-public class LruBlockCache implements ResizableBlockCache, HeapSize {
+public class LruBlockCache implements FirstLevelBlockCache {
private static final Logger LOG =
LoggerFactory.getLogger(LruBlockCache.class);
@@ -252,8 +254,7 @@ public class LruBlockCache implements ResizableBlockCache,
HeapSize {
DEFAULT_MEMORY_FACTOR,
DEFAULT_HARD_CAPACITY_LIMIT_FACTOR,
false,
- DEFAULT_MAX_BLOCK_SIZE
- );
+ DEFAULT_MAX_BLOCK_SIZE);
}
public LruBlockCache(long maxSize, long blockSize, boolean evictionThread,
Configuration conf) {
@@ -269,8 +270,7 @@ public class LruBlockCache implements ResizableBlockCache,
HeapSize {
conf.getFloat(LRU_HARD_CAPACITY_LIMIT_FACTOR_CONFIG_NAME,
DEFAULT_HARD_CAPACITY_LIMIT_FACTOR),
conf.getBoolean(LRU_IN_MEMORY_FORCE_MODE_CONFIG_NAME,
DEFAULT_IN_MEMORY_FORCE_MODE),
- conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE)
- );
+ conf.getLong(LRU_MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE));
}
public LruBlockCache(long maxSize, long blockSize, Configuration conf) {
@@ -339,6 +339,14 @@ public class LruBlockCache implements ResizableBlockCache,
HeapSize {
}
@Override
+ public void setVictimCache(BlockCache victimCache) {
+ if (victimHandler != null) {
+ throw new IllegalArgumentException("The victim cache has already been
set");
+ }
+ victimHandler = requireNonNull(victimCache);
+ }
+
+ @Override
public void setMaxSize(long maxSize) {
this.maxSize = maxSize;
if (this.size.get() > acceptableSize() && !evictionInProgress) {
@@ -505,6 +513,7 @@ public class LruBlockCache implements ResizableBlockCache,
HeapSize {
*
* @return true if contains the block
*/
+ @Override
public boolean containsBlock(BlockCacheKey cacheKey) {
return map.containsKey(cacheKey);
}
@@ -1155,11 +1164,6 @@ public class LruBlockCache implements
ResizableBlockCache, HeapSize {
return counts;
}
- public void setVictimCache(BlockCache handler) {
- assert victimHandler == null;
- victimHandler = handler;
- }
-
@VisibleForTesting
Map<BlockCacheKey, LruCachedBlock> getMapForTests() {
return map;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java
new file mode 100644
index 0000000..fbad3e3
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/TinyLfuBlockCache.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Policy.Eviction;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.util.StringUtils;
+import
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects;
+import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A block cache that is memory-aware using {@link HeapSize}, memory bounded
using the W-TinyLFU
+ * eviction algorithm, and concurrent. This implementation delegates to a
Caffeine cache to provide
+ * O(1) read and write operations.
+ * <ul>
+ * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li>
+ * <li>Caffeine: https://github.com/ben-manes/caffeine</li>
+ * <li>Cache design:
http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li>
+ * </ul>
+ */
[email protected]
+public final class TinyLfuBlockCache implements FirstLevelBlockCache {
+ private static final Logger LOG =
LoggerFactory.getLogger(TinyLfuBlockCache.class);
+
+ private static final String MAX_BLOCK_SIZE = "hbase.tinylfu.max.block.size";
+ private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L;
+ private static final int STAT_THREAD_PERIOD_SECONDS = 5 * 60;
+
+ private final Eviction<BlockCacheKey, Cacheable> policy;
+ private final ScheduledExecutorService statsThreadPool;
+ private final long maxBlockSize;
+ private final CacheStats stats;
+
+ private BlockCache victimCache;
+
+ @VisibleForTesting
+ final Cache<BlockCacheKey, Cacheable> cache;
+
+ /**
+ * Creates a block cache.
+ *
+ * @param maximumSizeInBytes maximum size of this cache, in bytes
+ * @param avgBlockSize expected average size of blocks, in bytes
+ * @param executor the cache's executor
+ * @param conf additional configuration
+ */
+ public TinyLfuBlockCache(long maximumSizeInBytes, long avgBlockSize,
+ Executor executor, Configuration conf) {
+ this(maximumSizeInBytes, avgBlockSize,
+ conf.getLong(MAX_BLOCK_SIZE, DEFAULT_MAX_BLOCK_SIZE), executor);
+ }
+
+ /**
+ * Creates a block cache.
+ *
+ * @param maximumSizeInBytes maximum size of this cache, in bytes
+ * @param avgBlockSize expected average size of blocks, in bytes
+ * @param maxBlockSize maximum size of a block, in bytes
+ * @param executor the cache's executor
+ */
+ public TinyLfuBlockCache(long maximumSizeInBytes,
+ long avgBlockSize, long maxBlockSize, Executor executor) {
+ this.cache = Caffeine.newBuilder()
+ .executor(executor)
+ .maximumWeight(maximumSizeInBytes)
+ .removalListener(new EvictionListener())
+ .weigher((BlockCacheKey key, Cacheable value) ->
+ (int) Math.min(value.heapSize(), Integer.MAX_VALUE))
+ .initialCapacity((int) Math.ceil((1.2 * maximumSizeInBytes) /
avgBlockSize))
+ .build();
+ this.maxBlockSize = maxBlockSize;
+ this.policy = cache.policy().eviction().get();
+ this.stats = new CacheStats(getClass().getSimpleName());
+
+ statsThreadPool = Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryBuilder()
+
.setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true).build());
+ statsThreadPool.scheduleAtFixedRate(this::logStats,
+ STAT_THREAD_PERIOD_SECONDS, STAT_THREAD_PERIOD_SECONDS,
TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void setVictimCache(BlockCache victimCache) {
+ if (this.victimCache != null) {
+ throw new IllegalArgumentException("The victim cache has already been
set");
+ }
+ this.victimCache = requireNonNull(victimCache);
+ }
+
+ @Override
+ public long size() {
+ return policy.getMaximum();
+ }
+
+ @Override
+ public long getFreeSize() {
+ return size() - getCurrentSize();
+ }
+
+ @Override
+ public long getCurrentSize() {
+ return policy.weightedSize().getAsLong();
+ }
+
+ @Override
+ public long getBlockCount() {
+ return cache.estimatedSize();
+ }
+
+ @Override
+ public long heapSize() {
+ return getCurrentSize();
+ }
+
+ @Override
+ public void setMaxSize(long size) {
+ policy.setMaximum(size);
+ }
+
+ @Override
+ public boolean containsBlock(BlockCacheKey cacheKey) {
+ return cache.asMap().containsKey(cacheKey);
+ }
+
+ @Override
+ public Cacheable getBlock(BlockCacheKey cacheKey,
+ boolean caching, boolean repeat, boolean updateCacheMetrics) {
+ Cacheable value = cache.getIfPresent(cacheKey);
+ if (value == null) {
+ if (repeat) {
+ return null;
+ }
+ if (updateCacheMetrics) {
+ stats.miss(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
+ }
+ if (victimCache != null) {
+ value = victimCache.getBlock(cacheKey, caching, repeat,
updateCacheMetrics);
+ if ((value != null) && caching) {
+ if ((value instanceof HFileBlock) && ((HFileBlock)
value).usesSharedMemory()) {
+ value = ((HFileBlock) value).deepClone();
+ }
+ cacheBlock(cacheKey, value);
+ }
+ }
+ } else if (updateCacheMetrics) {
+ stats.hit(caching, cacheKey.isPrimary(), cacheKey.getBlockType());
+ }
+ return value;
+ }
+
+ @Override
+ public void cacheBlock(BlockCacheKey cacheKey, Cacheable value, boolean
inMemory) {
+ cacheBlock(cacheKey, value);
+ }
+
+ @Override
+ public void cacheBlock(BlockCacheKey key, Cacheable value) {
+ if (value.heapSize() > maxBlockSize) {
+ // If there are a lot of blocks that are too big this can make the logs
too noisy (2% logged)
+ if (stats.failInsert() % 50 == 0) {
+ LOG.warn(String.format(
+ "Trying to cache too large a block %s @ %,d is %,d which is larger
than %,d",
+ key.getHfileName(), key.getOffset(), value.heapSize(),
DEFAULT_MAX_BLOCK_SIZE));
+ }
+ } else {
+ cache.put(key, value);
+ }
+ }
+
+ @Override
+ public boolean evictBlock(BlockCacheKey cacheKey) {
+ Cacheable value = cache.asMap().remove(cacheKey);
+ return (value != null);
+ }
+
+ @Override
+ public int evictBlocksByHfileName(String hfileName) {
+ int evicted = 0;
+ for (BlockCacheKey key : cache.asMap().keySet()) {
+ if (key.getHfileName().equals(hfileName) && evictBlock(key)) {
+ evicted++;
+ }
+ }
+ if (victimCache != null) {
+ evicted += victimCache.evictBlocksByHfileName(hfileName);
+ }
+ return evicted;
+ }
+
+ @Override
+ public CacheStats getStats() {
+ return stats;
+ }
+
+ @Override
+ public void shutdown() {
+ if (victimCache != null) {
+ victimCache.shutdown();
+ }
+ statsThreadPool.shutdown();
+ }
+
+ @Override
+ public BlockCache[] getBlockCaches() {
+ return null;
+ }
+
+ @Override
+ public Iterator<CachedBlock> iterator() {
+ long now = System.nanoTime();
+ return cache.asMap().entrySet().stream()
+ .map(entry -> (CachedBlock) new CachedBlockView(entry.getKey(),
entry.getValue(), now))
+ .iterator();
+ }
+
+ @Override
+ public void returnBlock(BlockCacheKey cacheKey, Cacheable block) {
+ // There is no SHARED type here in L1. But the block might have been
served from the L2 victim
+ // cache (when the Combined mode = false). So just try return this block
to the victim cache.
+ // Note : In case of CombinedBlockCache we will have this victim cache
configured for L1
+ // cache. But CombinedBlockCache will only call returnBlock on L2 cache.
+ if (victimCache != null) {
+ victimCache.returnBlock(cacheKey, block);
+ }
+ }
+
+ private void logStats() {
+ LOG.info(
+ "totalSize=" + StringUtils.byteDesc(heapSize()) + ", " +
+ "freeSize=" + StringUtils.byteDesc(getFreeSize()) + ", " +
+ "max=" + StringUtils.byteDesc(size()) + ", " +
+ "blockCount=" + getBlockCount() + ", " +
+ "accesses=" + stats.getRequestCount() + ", " +
+ "hits=" + stats.getHitCount() + ", " +
+ "hitRatio=" + (stats.getHitCount() == 0 ?
+ "0," : StringUtils.formatPercent(stats.getHitRatio(), 2) + ", ") +
+ "cachingAccesses=" + stats.getRequestCachingCount() + ", " +
+ "cachingHits=" + stats.getHitCachingCount() + ", " +
+ "cachingHitsRatio=" + (stats.getHitCachingCount() == 0 ?
+ "0,": (StringUtils.formatPercent(stats.getHitCachingRatio(), 2) + ",
")) +
+ "evictions=" + stats.getEvictionCount() + ", " +
+ "evicted=" + stats.getEvictedCount());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("blockCount", getBlockCount())
+ .add("currentSize", getCurrentSize())
+ .add("freeSize", getFreeSize())
+ .add("maxSize", size())
+ .add("heapSize", heapSize())
+ .add("victimCache", (victimCache != null))
+ .toString();
+ }
+
+ /** A removal listener to asynchronously record evictions and populate the
victim cache. */
+ private final class EvictionListener implements
RemovalListener<BlockCacheKey, Cacheable> {
+
+ @Override
+ public void onRemoval(BlockCacheKey key, Cacheable value, RemovalCause
cause) {
+ if (!cause.wasEvicted()) {
+ // An explicit eviction (invalidation) is not added to the victim
cache as the data may
+ // no longer be valid for subsequent queries.
+ return;
+ }
+
+ recordEviction();
+
+ if (victimCache == null) {
+ return;
+ } else if (victimCache instanceof BucketCache) {
+ BucketCache victimBucketCache = (BucketCache) victimCache;
+ victimBucketCache.cacheBlockWithWait(key, value, /* inMemory */ true,
/* wait */ true);
+ } else {
+ victimCache.cacheBlock(key, value);
+ }
+ }
+ }
+
+ /**
+ * Records an eviction. The number of eviction operations and evicted blocks
are identical, as
+ * an eviction is triggered immediately when the capacity has been exceeded.
An eviction is
+ * performed asynchronously. See the library's documentation for details on
write buffers,
+ * batching, and maintenance behavior.
+ */
+ private void recordEviction() {
+ // FIXME: Currently does not capture the insertion time
+ stats.evicted(Long.MAX_VALUE, true);
+ stats.evict();
+ }
+
+ private static final class CachedBlockView implements CachedBlock {
+ private static final Comparator<CachedBlock> COMPARATOR = Comparator
+ .comparing(CachedBlock::getFilename)
+ .thenComparing(CachedBlock::getOffset)
+ .thenComparing(CachedBlock::getCachedTime);
+
+ private final BlockCacheKey key;
+ private final Cacheable value;
+ private final long now;
+
+ public CachedBlockView(BlockCacheKey key, Cacheable value, long now) {
+ this.now = now;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public BlockPriority getBlockPriority() {
+ // This does not appear to be used in any meaningful way and is
irrelevant to this cache
+ return BlockPriority.MEMORY;
+ }
+
+ @Override
+ public BlockType getBlockType() {
+ return value.getBlockType();
+ }
+
+ @Override
+ public long getOffset() {
+ return key.getOffset();
+ }
+
+ @Override
+ public long getSize() {
+ return value.heapSize();
+ }
+
+ @Override
+ public long getCachedTime() {
+ // This does not appear to be used in any meaningful way, so not captured
+ return 0L;
+ }
+
+ @Override
+ public String getFilename() {
+ return key.getHfileName();
+ }
+
+ @Override
+ public int compareTo(CachedBlock other) {
+ return COMPARATOR.compare(this, other);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ } else if (!(obj instanceof CachedBlock)) {
+ return false;
+ }
+ CachedBlock other = (CachedBlock) obj;
+ return compareTo(other) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return key.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return BlockCacheUtil.toString(this, now);
+ }
+ }
+
+ @Override
+ public long getMaxSize() {
+ return size();
+ }
+
+ @Override
+ public long getCurrentDataSize() {
+ return getCurrentSize();
+ }
+
+ @Override
+ public long getDataBlockCount() {
+ return getBlockCount();
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 105e9d8..1c418f5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -94,11 +94,10 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos;
*
* <p>BucketCache can be used as mainly a block cache (see
* {@link org.apache.hadoop.hbase.io.hfile.CombinedBlockCache}), combined with
- * LruBlockCache to decrease CMS GC and heap fragmentation.
+ * a BlockCache to decrease CMS GC and heap fragmentation.
*
* <p>It also can be used as a secondary cache (e.g. using a file on
ssd/fusionio to store
- * blocks) to enlarge cache space via
- * {@link org.apache.hadoop.hbase.io.hfile.LruBlockCache#setVictimCache}
+ * blocks) to enlarge cache space via a victim cache.
*/
@InterfaceAudience.Private
public class BucketCache implements BlockCache, HeapSize {
@@ -422,7 +421,7 @@ public class BucketCache implements BlockCache, HeapSize {
* @param inMemory if block is in-memory
* @param wait if true, blocking wait when queue is full
*/
- private void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable
cachedItem, boolean inMemory,
+ public void cacheBlockWithWait(BlockCacheKey cacheKey, Cacheable cachedItem,
boolean inMemory,
boolean wait) {
if (cacheEnabled) {
if (backingMap.containsKey(cacheKey) || ramCache.containsKey(cacheKey)) {
@@ -693,7 +692,7 @@ public class BucketCache implements BlockCache, HeapSize {
return this.realCacheSize.sum();
}
- private long acceptableSize() {
+ public long acceptableSize() {
return (long) Math.floor(bucketAllocator.getTotalSize() *
acceptableFactor);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index a96417d..ea5586c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -124,7 +124,7 @@ public class HeapMemoryManager {
private ResizableBlockCache toResizableBlockCache(BlockCache blockCache) {
if (blockCache instanceof CombinedBlockCache) {
- return (ResizableBlockCache) ((CombinedBlockCache)
blockCache).getOnHeapCache();
+ return (ResizableBlockCache) ((CombinedBlockCache)
blockCache).getFirstLevelCache();
} else {
return (ResizableBlockCache) blockCache;
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java
index 1313f31..10f18a8 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBlockCacheReporting.java
@@ -98,7 +98,6 @@ public class TestBlockCacheReporting {
CacheConfig cc = new CacheConfig(this.conf);
assertTrue(CacheConfig.DEFAULT_IN_MEMORY == cc.isInMemory());
BlockCache blockCache = BlockCacheFactory.createBlockCache(this.conf);
- assertTrue(blockCache instanceof LruBlockCache);
logPerBlock(blockCache);
addDataAndHits(blockCache, 3);
// The below has no asserts. It is just exercising toString and toJSON
code.
@@ -140,7 +139,7 @@ public class TestBlockCacheReporting {
}
LOG.info("filename=" + e.getKey() + ", count=" + count + ", countData="
+ countData +
", size=" + size + ", sizeData=" + sizeData);
- LOG.info(BlockCacheUtil.toJSON(e.getKey(), e.getValue()));
+ //LOG.info(BlockCacheUtil.toJSON(e.getKey(), e.getValue()));
}
}
@@ -148,7 +147,7 @@ public class TestBlockCacheReporting {
BlockCacheUtil.CachedBlocksByFile cbsbf = new
BlockCacheUtil.CachedBlocksByFile();
for (CachedBlock cb : bc) {
LOG.info(cb.toString());
- LOG.info(BlockCacheUtil.toJSON(bc));
+ //LOG.info(BlockCacheUtil.toJSON(bc));
cbsbf.update(cb);
}
return cbsbf;
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
index 4c56fff..0b9cc19 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
@@ -336,7 +336,7 @@ public class TestCacheConfig {
assertTrue(blockCache instanceof CombinedBlockCache);
// TODO: Assert sizes allocated are right and proportions.
CombinedBlockCache cbc = (CombinedBlockCache) blockCache;
- LruBlockCache lbc = cbc.onHeapCache;
+ FirstLevelBlockCache lbc = cbc.l1Cache;
assertEquals(lruExpectedSize, lbc.getMaxSize());
BlockCache bc = cbc.l2Cache;
// getMaxSize comes back in bytes but we specified size in MB
@@ -350,7 +350,7 @@ public class TestCacheConfig {
assertEquals(initialL1BlockCount + 1, lbc.getBlockCount());
assertEquals(initialL2BlockCount, bc.getBlockCount());
// Force evictions by putting in a block too big.
- final long justTooBigSize = lbc.acceptableSize() + 1;
+ final long justTooBigSize = ((LruBlockCache)lbc).acceptableSize() + 1;
lbc.cacheBlock(new BlockCacheKey("bck2", 0), new DataCacheEntry() {
@Override
public long heapSize() {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
index ed440e7..1fcbdaa 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestScannerSelectionUsingKeyRange.java
@@ -123,16 +123,17 @@ public class TestScannerSelectionUsingKeyRange {
}
Scan scan = new Scan(Bytes.toBytes("aaa"), Bytes.toBytes("aaz"));
- LruBlockCache cache = (LruBlockCache)
BlockCacheFactory.createBlockCache(conf);
- cache.clearCache();
+ BlockCache cache = BlockCacheFactory.createBlockCache(conf);
InternalScanner scanner = region.getScanner(scan);
List<Cell> results = new ArrayList<>();
while (scanner.next(results)) {
}
scanner.close();
assertEquals(0, results.size());
- Set<String> accessedFiles = cache.getCachedFileNamesForTest();
- assertEquals(expectedCount, accessedFiles.size());
+ if (cache instanceof LruBlockCache) {
+ Set<String> accessedFiles =
((LruBlockCache)cache).getCachedFileNamesForTest();
+ assertEquals(expectedCount, accessedFiles.size());
+ }
HBaseTestingUtility.closeRegionAndWAL(region);
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestTinyLfuBlockCache.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestTinyLfuBlockCache.java
new file mode 100644
index 0000000..9a333f8
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestTinyLfuBlockCache.java
@@ -0,0 +1,309 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.HeapSize;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.ClassSize;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests the concurrent TinyLfuBlockCache.
+ */
+@Category({IOTests.class, SmallTests.class})
+public class TestTinyLfuBlockCache {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestTinyLfuBlockCache.class);
+
+ @Test
+ public void testCacheSimple() throws Exception {
+
+ long maxSize = 1000000;
+ long blockSize = calculateBlockSizeDefault(maxSize, 101);
+
+ TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize,
blockSize, Runnable::run);
+
+ CachedItem [] blocks = generateRandomBlocks(100, blockSize);
+
+ long expectedCacheSize = cache.heapSize();
+
+ // Confirm empty
+ for (CachedItem block : blocks) {
+ assertTrue(cache.getBlock(block.cacheKey, true, false, true) == null);
+ }
+
+ // Add blocks
+ for (CachedItem block : blocks) {
+ cache.cacheBlock(block.cacheKey, block);
+ expectedCacheSize += block.heapSize();
+ }
+
+ // Verify correctly calculated cache heap size
+ assertEquals(expectedCacheSize, cache.heapSize());
+
+ // Check if all blocks are properly cached and retrieved
+ for (CachedItem block : blocks) {
+ HeapSize buf = cache.getBlock(block.cacheKey, true, false, true);
+ assertTrue(buf != null);
+ assertEquals(buf.heapSize(), block.heapSize());
+ }
+
+ // Re-add same blocks and ensure nothing has changed
+ long expectedBlockCount = cache.getBlockCount();
+ for (CachedItem block : blocks) {
+ cache.cacheBlock(block.cacheKey, block);
+ }
+ assertEquals(
+ "Cache should ignore cache requests for blocks already in cache",
+ expectedBlockCount, cache.getBlockCount());
+
+ // Verify correctly calculated cache heap size
+ assertEquals(expectedCacheSize, cache.heapSize());
+
+ // Check if all blocks are properly cached and retrieved
+ for (CachedItem block : blocks) {
+ HeapSize buf = cache.getBlock(block.cacheKey, true, false, true);
+ assertTrue(buf != null);
+ assertEquals(buf.heapSize(), block.heapSize());
+ }
+
+ // Expect no evictions
+ assertEquals(0, cache.getStats().getEvictionCount());
+ }
+
+ @Test
+ public void testCacheEvictionSimple() throws Exception {
+
+ long maxSize = 100000;
+ long blockSize = calculateBlockSizeDefault(maxSize, 10);
+
+ TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize,
blockSize, Runnable::run);
+
+ CachedItem [] blocks = generateFixedBlocks(11, blockSize, "block");
+
+ // Add all the blocks
+ for (CachedItem block : blocks) {
+ cache.cacheBlock(block.cacheKey, block);
+ }
+
+ // A single eviction run should have occurred
+ assertEquals(1, cache.getStats().getEvictionCount());
+
+ // The cache did not grow beyond max
+ assertTrue(cache.heapSize() < maxSize);
+
+ // All blocks except one should be in the cache
+ assertEquals(10, cache.getBlockCount());
+ }
+
+ @Test
+ public void testScanResistance() throws Exception {
+
+ long maxSize = 100000;
+ long blockSize = calculateBlockSize(maxSize, 10);
+
+ TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize,
blockSize, Runnable::run);
+
+ CachedItem [] singleBlocks = generateFixedBlocks(20, blockSize, "single");
+ CachedItem [] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
+
+ // Add 5 blocks from each
+ for(int i=0; i<5; i++) {
+ cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+ cache.cacheBlock(multiBlocks[i].cacheKey, multiBlocks[i]);
+ }
+
+ // Add frequency
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 10; j++) {
+ CachedItem block = multiBlocks[i];
+ cache.getBlock(block.cacheKey, true, false, true);
+ }
+ }
+
+ // Let's keep "scanning" by adding single blocks. From here on we only
+ // expect evictions from the single bucket.
+
+ for(int i=5;i<18;i++) {
+ cache.cacheBlock(singleBlocks[i].cacheKey, singleBlocks[i]);
+ }
+
+ for (CachedItem block : multiBlocks) {
+ assertTrue(cache.cache.asMap().containsKey(block.cacheKey));
+ }
+
+ assertEquals(10, cache.getBlockCount());
+ assertEquals(13, cache.getStats().getEvictionCount());
+
+ }
+
+ @Test
+ public void testMaxBlockSize() throws Exception {
+ long maxSize = 100000;
+ long blockSize = calculateBlockSize(maxSize, 10);
+
+ TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize,
blockSize, Runnable::run);
+ CachedItem [] tooLong = generateFixedBlocks(10, 2 * blockSize, "long");
+ CachedItem [] small = generateFixedBlocks(15, blockSize / 2, "small");
+
+ for (CachedItem i:tooLong) {
+ cache.cacheBlock(i.cacheKey, i);
+ }
+ for (CachedItem i:small) {
+ cache.cacheBlock(i.cacheKey, i);
+ }
+ assertEquals(15,cache.getBlockCount());
+ for (CachedItem i:small) {
+ assertNotNull(cache.getBlock(i.cacheKey, true, false, false));
+ }
+ for (CachedItem i:tooLong) {
+ assertNull(cache.getBlock(i.cacheKey, true, false, false));
+ }
+
+ assertEquals(10, cache.getStats().getFailedInserts());
+ }
+
+ @Test
+ public void testResizeBlockCache() throws Exception {
+
+ long maxSize = 100000;
+ long blockSize = calculateBlockSize(maxSize, 10);
+
+ TinyLfuBlockCache cache = new TinyLfuBlockCache(maxSize, blockSize,
blockSize, Runnable::run);
+
+ CachedItem [] blocks = generateFixedBlocks(10, blockSize, "block");
+
+ for(CachedItem block : blocks) {
+ cache.cacheBlock(block.cacheKey, block);
+ }
+
+ // Do not expect any evictions yet
+ assertEquals(10, cache.getBlockCount());
+ assertEquals(0, cache.getStats().getEvictionCount());
+
+ // Resize to half capacity plus an extra block (otherwise we evict an
extra)
+ cache.setMaxSize(maxSize / 2);
+
+ // And we expect 1/2 of the blocks to be evicted
+ assertEquals(5, cache.getBlockCount());
+ assertEquals(5, cache.getStats().getEvictedCount());
+ }
+
+ private CachedItem [] generateFixedBlocks(int numBlocks, int size, String
pfx) {
+ CachedItem [] blocks = new CachedItem[numBlocks];
+ for(int i=0;i<numBlocks;i++) {
+ blocks[i] = new CachedItem(pfx + i, size);
+ }
+ return blocks;
+ }
+
+ private CachedItem [] generateFixedBlocks(int numBlocks, long size, String
pfx) {
+ return generateFixedBlocks(numBlocks, (int)size, pfx);
+ }
+
+ private CachedItem [] generateRandomBlocks(int numBlocks, long maxSize) {
+ CachedItem [] blocks = new CachedItem[numBlocks];
+ Random r = new Random();
+ for(int i=0;i<numBlocks;i++) {
+ blocks[i] = new CachedItem("block" + i, r.nextInt((int)maxSize)+1);
+ }
+ return blocks;
+ }
+
+ private long calculateBlockSize(long maxSize, int numBlocks) {
+ long roughBlockSize = maxSize / numBlocks;
+ int numEntries = (int)Math.ceil((1.2)*maxSize/roughBlockSize);
+ long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD +
+ ClassSize.CONCURRENT_HASHMAP +
+ (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
+ (LruBlockCache.DEFAULT_CONCURRENCY_LEVEL *
ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+ long negateBlockSize = totalOverhead/numEntries;
+ negateBlockSize += LruCachedBlock.PER_BLOCK_OVERHEAD;
+ return ClassSize.align((long)Math.floor((roughBlockSize -
negateBlockSize)*0.99f));
+ }
+
+ private long calculateBlockSizeDefault(long maxSize, int numBlocks) {
+ long roughBlockSize = maxSize / numBlocks;
+ int numEntries = (int)Math.ceil((1.2)*maxSize/roughBlockSize);
+ long totalOverhead = LruBlockCache.CACHE_FIXED_OVERHEAD +
+ ClassSize.CONCURRENT_HASHMAP +
+ (numEntries * ClassSize.CONCURRENT_HASHMAP_ENTRY) +
+ (LruBlockCache.DEFAULT_CONCURRENCY_LEVEL *
ClassSize.CONCURRENT_HASHMAP_SEGMENT);
+ long negateBlockSize = totalOverhead / numEntries;
+ negateBlockSize += LruCachedBlock.PER_BLOCK_OVERHEAD;
+ return ClassSize.align((long)Math.floor((roughBlockSize - negateBlockSize)*
+ LruBlockCache.DEFAULT_ACCEPTABLE_FACTOR));
+ }
+
+ private static class CachedItem implements Cacheable {
+ BlockCacheKey cacheKey;
+ int size;
+
+ CachedItem(String blockName, int size) {
+ this.cacheKey = new BlockCacheKey(blockName, 0);
+ this.size = size;
+ }
+
+ /** The size of this item reported to the block cache layer */
+ @Override
+ public long heapSize() {
+ return ClassSize.align(size);
+ }
+
+ @Override
+ public int getSerializedLength() {
+ return 0;
+ }
+
+ @Override
+ public CacheableDeserializer<Cacheable> getDeserializer() {
+ return null;
+ }
+
+ @Override
+ public BlockType getBlockType() {
+ return BlockType.DATA;
+ }
+
+ @Override
+ public MemoryType getMemoryType() {
+ return MemoryType.EXCLUSIVE;
+ }
+
+ @Override
+ public void serialize(ByteBuffer destination, boolean
includeNextBlockMetadata) {
+ }
+ }
+
+}
+
diff --git a/hbase-shaded/pom.xml b/hbase-shaded/pom.xml
index 0c5b046..b585b02 100644
--- a/hbase-shaded/pom.xml
+++ b/hbase-shaded/pom.xml
@@ -172,6 +172,10 @@
<shadedPattern>${shaded.prefix}.com.fasterxml</shadedPattern>
</relocation>
<relocation>
+
<pattern>com.github.benmanes.caffeine</pattern>
+
<shadedPattern>${shaded.prefix}.com.github.benmanes.caffeine</shadedPattern>
+ </relocation>
+ <relocation>
<pattern>com.google</pattern>
<shadedPattern>${shaded.prefix}.com.google</shadedPattern>
</relocation>
diff --git a/pom.xml b/pom.xml
index a550489..5759e40 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1508,6 +1508,7 @@
<!-- end HBASE-15925 default hadoop compatibility values -->
<audience-annotations.version>0.5.0</audience-annotations.version>
<avro.version>1.7.7</avro.version>
+ <caffeine.version>2.6.2</caffeine.version>
<commons-codec.version>1.10</commons-codec.version>
<commons-validator.version>1.6</commons-validator.version>
<!-- pretty outdated -->
@@ -1920,6 +1921,11 @@
<version>${slf4j.version}</version>
</dependency>
<dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>${caffeine.version}</version>
+ </dependency>
+ <dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics-core.version}</version>