ACCUMULO-4463: changes from review
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/40c1cb0b Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/40c1cb0b Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/40c1cb0b Branch: refs/heads/ACCUMULO-4463 Commit: 40c1cb0b6b3b466aad5ba835344bbc7edccae7dc Parents: 03a8f2a Author: Dave Marion <[email protected]> Authored: Mon May 8 15:23:35 2017 -0400 Committer: Dave Marion <[email protected]> Committed: Mon May 8 15:23:35 2017 -0400 ---------------------------------------------------------------------- .../core/client/rfile/RFileScanner.java | 33 +- .../org/apache/accumulo/core/conf/Property.java | 4 +- .../core/file/blockfile/cache/BlockCache.java | 11 +- .../cache/BlockCacheConfiguration.java | 82 ++ .../file/blockfile/cache/BlockCacheFactory.java | 7 +- .../core/file/blockfile/cache/CachedBlock.java | 4 +- .../file/blockfile/cache/LruBlockCache.java | 746 ------------------- .../file/blockfile/cache/TinyLfuBlockCache.java | 144 ---- .../file/blockfile/cache/lru/LruBlockCache.java | 637 ++++++++++++++++ .../cache/lru/LruBlockCacheConfiguration.java | 122 +++ .../cache/lru/LruBlockCacheFactory.java | 31 + .../cache/tinylfu/TinyLfuBlockCache.java | 149 ++++ .../tinylfu/TinyLfuBlockCacheConfiguration.java | 29 + .../cache/tinylfu/TinyLfuBlockCacheFactory.java | 31 + .../accumulo/core/summary/SummaryReader.java | 2 +- .../file/blockfile/cache/TestLruBlockCache.java | 181 ++--- .../accumulo/core/file/rfile/RFileTest.java | 20 +- .../tserver/TabletServerResourceManager.java | 26 +- 18 files changed, 1219 insertions(+), 1040 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 57322a8..b4a6d14 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -44,7 +44,8 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; -import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheConfiguration; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; @@ -56,6 +57,8 @@ import org.apache.accumulo.core.iterators.system.MultiIterator; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.LocalityGroupUtil; +import org.apache.commons.configuration.ConfigurationMap; +import org.apache.commons.configuration.MapConfiguration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.io.Text; @@ -129,7 +132,7 @@ class RFileScanner extends ScannerOptions implements Scanner { } @Override - public void start(AccumuloConfiguration conf, long maxSize, long blockSize) throws Exception {} + public void start() {} @Override public void stop() {} @@ -142,23 +145,25 @@ class RFileScanner extends ScannerOptions implements Scanner { this.opts = opts; if (opts.indexCacheSize > 0) { - this.indexCache = new LruBlockCache(); - try { - this.indexCache.start((AccumuloConfiguration) null, opts.indexCacheSize, CACHE_BLOCK_SIZE); - } catch (Exception e) { - throw new RuntimeException("Error starting cache", e); - } + MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); + config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(opts.indexCacheSize)); + config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(CACHE_BLOCK_SIZE)); + @SuppressWarnings("unchecked") + ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); + this.indexCache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); + this.indexCache.start(); } else { this.indexCache = new NoopCache(); } if (opts.dataCacheSize > 0) { - this.dataCache = new LruBlockCache(); - try { - this.dataCache.start((AccumuloConfiguration) null, opts.dataCacheSize, CACHE_BLOCK_SIZE); - } catch (Exception e) { - throw new RuntimeException("Error starting cache", e); - } + MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); + config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(opts.dataCacheSize)); + config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(CACHE_BLOCK_SIZE)); + @SuppressWarnings("unchecked") + ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); + this.dataCache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); + this.dataCache.start(); } else { this.dataCache = new NoopCache(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 0bbaf10..d0f7ce2 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -245,8 +245,8 @@ public enum Property { TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"), TSERV_CLIENT_TIMEOUT("tserver.client.timeout", "3s", PropertyType.TIMEDURATION, "Time to wait for clients to continue scans before closing a session."), TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES, "Specifies a default blocksize for the tserver caches"), - TSERV_CACHE_IMPL("tserver.cache.class", "org.apache.accumulo.core.file.blockfile.cache.LruBlockCache.class", PropertyType.STRING, - "Specifies the class name of the block cache implementation."), + TSERV_CACHE_IMPL("tserver.cache.factory.class", "org.apache.accumulo.core.file.blockfile.cache.lru.LRUBlockCacheFactory.class", PropertyType.STRING, + "Specifies the class name of the block cache factory implementation."), TSERV_DATACACHE_SIZE("tserver.cache.data.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for file data blocks."), TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "25%", PropertyType.MEMORY, "Specifies the size of the cache for file indices."), TSERV_SUMMARYCACHE_SIZE("tserver.cache.summary.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for summary data on each tablet server."), http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java index c5a17e4..f035f5d 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java @@ -17,8 +17,6 @@ */ package org.apache.accumulo.core.file.blockfile.cache; -import org.apache.accumulo.core.conf.AccumuloConfiguration; - /** * Block cache interface. */ @@ -26,15 +24,8 @@ public interface BlockCache { /** * Start the block cache - * - * @param conf - * Accumulo configuration object - * @param maxSize - * maximum size of the on-heap cache - * @param blockSize - * size of the default RFile blocks */ - void start(AccumuloConfiguration conf, long maxSize, long blockSize) throws Exception; + void start(); /** * Stop the block cache and release resources http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java new file mode 100644 index 0000000..e3ccbf5 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import java.util.Map; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; + +public class BlockCacheConfiguration { + + public static final String MAX_SIZE_PROPERTY = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block." + "max.size"; + public static final String BLOCK_SIZE_PROPERTY = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block." + "block.size"; + + private static final Long DEFAULT = Long.valueOf(-1); + + /** Maximum allowable size of cache (block put if size > max, evict) */ + private final long maxSize; + + /** Approximate block size */ + private final long blockSize; + + public BlockCacheConfiguration(AccumuloConfiguration conf) { + Map<String,String> props = conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX); + this.maxSize = getOrDefault(props, MAX_SIZE_PROPERTY, DEFAULT); + this.blockSize = getOrDefault(props, BLOCK_SIZE_PROPERTY, DEFAULT); + + if (DEFAULT.equals(this.maxSize)) { + throw new IllegalArgumentException("Block cache max size must be specified."); + } + if (DEFAULT.equals(this.blockSize)) { + throw new IllegalArgumentException("Block cache block size must be specified."); + } + } + + public long getMaxSize() { + return maxSize; + } + + public long getBlockSize() { + return blockSize; + } + + @SuppressWarnings("unchecked") + protected <T> T getOrDefault(Map<String,String> props, String propertyName, T defaultValue) { + String o = props.get(propertyName); + if (null == o && defaultValue == null) { + throw new RuntimeException("Property " + propertyName + " not specified and no default supplied."); + } else if (null == o) { + return defaultValue; + } else { + if (defaultValue.getClass().equals(Integer.class)) { + return (T) Integer.valueOf(o); + } else if (defaultValue.getClass().equals(Long.class)) { + return (T) Long.valueOf(o); + } else if (defaultValue.getClass().equals(Float.class)) { + return (T) Float.valueOf(o); + } else if (defaultValue.getClass().equals(Boolean.class)) { + return (T) Boolean.valueOf(o); + } else { + throw new RuntimeException("Unknown parameter type"); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java index 70cbc7d..57ca5f1 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java @@ -21,12 +21,13 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; -public class BlockCacheFactory { +public abstract class BlockCacheFactory { - public static BlockCache getBlockCache(AccumuloConfiguration conf) throws Exception { + public static BlockCacheFactory getBlockCacheFactory(AccumuloConfiguration conf) throws Exception { String impl = conf.get(Property.TSERV_CACHE_IMPL); - Class<? extends BlockCache> clazz = AccumuloVFSClassLoader.loadClass(impl, BlockCache.class); + Class<? extends BlockCacheFactory> clazz = AccumuloVFSClassLoader.loadClass(impl, BlockCacheFactory.class); return clazz.newInstance(); } + public abstract BlockCache getBlockCache(AccumuloConfiguration conf); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java index c67b4c7..b04b77a 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java @@ -19,6 +19,8 @@ package org.apache.accumulo.core.file.blockfile.cache; import java.util.Objects; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; + /** * Represents an entry in the {@link LruBlockCache}. * @@ -31,7 +33,7 @@ public class CachedBlock implements HeapSize, Comparable<CachedBlock>, CacheEntr public final static long PER_BLOCK_OVERHEAD = ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + (2 * SizeConstants.SIZEOF_LONG) + ClassSize.STRING + ClassSize.BYTE_BUFFER); - static enum BlockPriority { + public static enum BlockPriority { /** * Accessed a single time (used for scan-resistance) */ http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java deleted file mode 100644 index 921b5a5..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java +++ /dev/null @@ -1,746 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file.blockfile.cache; - -import java.lang.ref.WeakReference; -import java.util.Objects; -import java.util.PriorityQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.util.NamingThreadFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an LRU eviction algorithm, and concurrent: backed by a - * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} operations. - * - * <p> - * Contains three levels of block priority to allow for scan-resistance and in-memory families. A block is added with an inMemory flag if necessary, otherwise a - * block becomes a single access priority. Once a blocked is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the - * cache, adding a least-frequently-used element to the eviction algorithm. - * - * <p> - * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each priority will retain close to its maximum size, however, if - * any priority is not using its entire chunk the others are able to grow beyond their chunk size. - * - * <p> - * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The block size is not especially important as this cache is - * fully dynamic in its sizing of blocks. It is only used for pre-allocating data structures and in initial heap estimation of the map. - * - * <p> - * The detailed constructor defines the sizes for the three priorities (they should total to the maximum size defined). It also sets the levels that trigger and - * control the eviction thread. - * - * <p> - * The acceptable size is the cache size level which triggers the eviction process to start. It evicts enough blocks to get the size below the minimum size - * specified. - * - * <p> - * Eviction happens in a separate thread and involves a single full-scan of the map. It determines how many bytes must be freed to reach the minimum size, and - * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then - * uses the priority chunk sizes to evict fairly according to the relative sizes and usage. - */ -public class LruBlockCache implements BlockCache, HeapSize { - - private static final Logger log = LoggerFactory.getLogger(LruBlockCache.class); - - /** Default Configuration Parameters */ - - /** Backing Concurrent Map Configuration */ - static final float DEFAULT_LOAD_FACTOR = 0.75f; - static final int DEFAULT_CONCURRENCY_LEVEL = 16; - - /** Eviction thresholds */ - static final float DEFAULT_MIN_FACTOR = 0.75f; - static final float DEFAULT_ACCEPTABLE_FACTOR = 0.85f; - - /** Priority buckets */ - static final float DEFAULT_SINGLE_FACTOR = 0.25f; - static final float DEFAULT_MULTI_FACTOR = 0.50f; - static final float DEFAULT_MEMORY_FACTOR = 0.25f; - - /** Statistics thread */ - static final int statThreadPeriod = 60; - - /** Concurrent map (the cache) */ - private ConcurrentHashMap<String,CachedBlock> map; - - /** Eviction lock (locked when eviction in process) */ - private final ReentrantLock evictionLock = new ReentrantLock(true); - - /** Volatile boolean to track if we are in an eviction process or not */ - private volatile boolean evictionInProgress = false; - - /** Eviction thread */ - private EvictionThread evictionThread; - - /** Statistics thread schedule pool (for heavy debugging, could remove) */ - private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats")); - - /** Current size of cache */ - private AtomicLong size; - - /** Current number of cached elements */ - private AtomicLong elements; - - /** Cache access count (sequential ID) */ - private AtomicLong count; - - /** Cache statistics */ - private CacheStats stats; - - /** Maximum allowable size of cache (block put if size > max, evict) */ - private long maxSize; - - /** Approximate block size */ - private long blockSize; - - /** Acceptable size of cache (no evictions if size < acceptable) */ - private float acceptableFactor = DEFAULT_ACCEPTABLE_FACTOR; - - /** Minimum threshold of cache (when evicting, evict until size < min) */ - private float minFactor = DEFAULT_MIN_FACTOR; - - /** Single access bucket size */ - private float singleFactor = DEFAULT_SINGLE_FACTOR; - - /** Multiple access bucket size */ - private float multiFactor = DEFAULT_MULTI_FACTOR; - - /** In-memory bucket size */ - private float memoryFactor = DEFAULT_MEMORY_FACTOR; - - /** LruBlockCache cache = new LruBlockCache **/ - private float mapLoadFactor = DEFAULT_LOAD_FACTOR; - - /** LruBlockCache cache = new LruBlockCache **/ - private int mapConcurrencyLevel = DEFAULT_CONCURRENCY_LEVEL; - - /** Overhead of the structure itself */ - private long overhead; - - private boolean useEvictionThread = true; - - /** - * Default constructor. Specify maximum size and expected average block size (approximation is fine). - * - * <p> - * All other factors will be calculated based on defaults specified in this class. - * - * @param conf - * accumulo configuration - * @param maxSize - * maximum size of cache, in bytes - * @param blockSize - * approximate size of each block, in bytes - */ - public void start(AccumuloConfiguration conf, long maxSize, long blockSize) { - int mapInitialSize = (int) Math.ceil(1.2 * maxSize / blockSize); - - if (singleFactor + multiFactor + memoryFactor != 1) { - throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0"); - } - if (minFactor >= acceptableFactor) { - throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); - } - if (minFactor >= 1.0f || acceptableFactor >= 1.0f) { - throw new IllegalArgumentException("all factors must be < 1"); - } - this.maxSize = maxSize; - this.blockSize = blockSize; - map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); - this.stats = new CacheStats(); - this.count = new AtomicLong(0); - this.elements = new AtomicLong(0); - this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); - this.size = new AtomicLong(this.overhead); - - if (useEvictionThread) { - this.evictionThread = new EvictionThread(this); - this.evictionThread.start(); - while (!this.evictionThread.running()) { - try { - Thread.sleep(10); - } catch (InterruptedException ex) { - throw new RuntimeException(ex); - } - } - } else { - this.evictionThread = null; - } - this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); - } - - public void stop() {} - - public float getMinFactor() { - return minFactor; - } - - public float getSingleFactor() { - return singleFactor; - } - - public float getMultiFactor() { - return multiFactor; - } - - public float getMemoryFactor() { - return memoryFactor; - } - - public float getMapLoadFactor() { - return mapLoadFactor; - } - - public int getMapConcurrencyLevel() { - return mapConcurrencyLevel; - } - - public long getOverhead() { - return overhead; - } - - public boolean isUseEvictionThread() { - return useEvictionThread; - } - - public void setMinFactor(float minFactor) { - this.minFactor = minFactor; - } - - public void setSingleFactor(float singleFactor) { - this.singleFactor = singleFactor; - } - - public void setMultiFactor(float multiFactor) { - this.multiFactor = multiFactor; - } - - public void setMemoryFactor(float memoryFactor) { - this.memoryFactor = memoryFactor; - } - - public void setMapLoadFactor(float mapLoadFactor) { - this.mapLoadFactor = mapLoadFactor; - } - - public void setMapConcurrencyLevel(int mapConcurrencyLevel) { - this.mapConcurrencyLevel = mapConcurrencyLevel; - } - - public void setOverhead(long overhead) { - this.overhead = overhead; - } - - public void setUseEvictionThread(boolean useEvictionThread) { - this.useEvictionThread = useEvictionThread; - } - - public float getAcceptableFactor() { - return acceptableFactor; - } - - public void setAcceptableFactor(float acceptableFactor) { - this.acceptableFactor = acceptableFactor; - } - - public void setMaxSize(long maxSize) { - this.maxSize = maxSize; - if (this.size.get() > acceptableSize() && !evictionInProgress) { - runEviction(); - } - } - - // BlockCache implementation - - /** - * Cache the block with the specified name and buffer. - * <p> - * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a - * race condition and will update the buffer but not modify the size of the cache. - * - * @param blockName - * block name - * @param buf - * block buffer - * @param inMemory - * if block is in-memory - */ - @Override - public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) { - CachedBlock cb = map.get(blockName); - if (cb != null) { - stats.duplicateReads(); - cb.access(count.incrementAndGet()); - } else { - cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory); - CachedBlock currCb = map.putIfAbsent(blockName, cb); - if (currCb != null) { - stats.duplicateReads(); - cb = currCb; - cb.access(count.incrementAndGet()); - } else { - // Actually added block to cache - long newSize = size.addAndGet(cb.heapSize()); - elements.incrementAndGet(); - if (newSize > acceptableSize() && !evictionInProgress) { - runEviction(); - } - } - } - - return cb; - } - - /** - * Cache the block with the specified name and buffer. - * <p> - * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a - * race condition and will update the buffer but not modify the size of the cache. - * - * @param blockName - * block name - * @param buf - * block buffer - */ - @Override - public CacheEntry cacheBlock(String blockName, byte buf[]) { - return cacheBlock(blockName, buf, false); - } - - /** - * Get the buffer of the block with the specified name. - * - * @param blockName - * block name - * @return buffer of specified block name, or null if not in cache - */ - @Override - public CachedBlock getBlock(String blockName) { - CachedBlock cb = map.get(blockName); - if (cb == null) { - stats.miss(); - return null; - } - stats.hit(); - cb.access(count.incrementAndGet()); - return cb; - } - - protected long evictBlock(CachedBlock block) { - map.remove(block.getName()); - size.addAndGet(-1 * block.heapSize()); - elements.decrementAndGet(); - stats.evicted(); - return block.heapSize(); - } - - /** - * Multi-threaded call to run the eviction process. - */ - private void runEviction() { - if (evictionThread == null) { - evict(); - } else { - evictionThread.evict(); - } - } - - /** - * Eviction method. - */ - void evict() { - - // Ensure only one eviction at a time - if (!evictionLock.tryLock()) - return; - - try { - evictionInProgress = true; - - long bytesToFree = size.get() - minSize(); - - log.trace("Block cache LRU eviction started. Attempting to free {} bytes", bytesToFree); - - if (bytesToFree <= 0) - return; - - // Instantiate priority buckets - BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize, singleSize()); - BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize, multiSize()); - BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize, memorySize()); - - // Scan entire map putting into appropriate buckets - for (CachedBlock cachedBlock : map.values()) { - switch (cachedBlock.getPriority()) { - case SINGLE: { - bucketSingle.add(cachedBlock); - break; - } - case MULTI: { - bucketMulti.add(cachedBlock); - break; - } - case MEMORY: { - bucketMemory.add(cachedBlock); - break; - } - } - } - - PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); - - bucketQueue.add(bucketSingle); - bucketQueue.add(bucketMulti); - bucketQueue.add(bucketMemory); - - int remainingBuckets = 3; - long bytesFreed = 0; - - BlockBucket bucket; - while ((bucket = bucketQueue.poll()) != null) { - long overflow = bucket.overflow(); - if (overflow > 0) { - long bucketBytesToFree = Math.min(overflow, (long) Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets)); - bytesFreed += bucket.free(bucketBytesToFree); - } - remainingBuckets--; - } - - float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 1024)); - float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 1024)); - float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 1024)); - - log.trace("Block cache LRU eviction completed. Freed {} bytes. Priority Sizes: Single={}MB ({}), Multi={}MB ({}), Memory={}MB ({})", bytesFreed, - singleMB, bucketSingle.totalSize(), multiMB, bucketMulti.totalSize(), memoryMB, bucketMemory.totalSize()); - - } finally { - stats.evict(); - evictionInProgress = false; - evictionLock.unlock(); - } - } - - /** - * Used to group blocks into priority buckets. There will be a BlockBucket for each priority (single, multi, memory). Once bucketed, the eviction algorithm - * takes the appropriate number of elements out of each according to configuration parameters and their relatives sizes. - */ - private class BlockBucket implements Comparable<BlockBucket> { - - private CachedBlockQueue queue; - private long totalSize = 0; - private long bucketSize; - - public BlockBucket(long bytesToFree, long blockSize, long bucketSize) { - this.bucketSize = bucketSize; - queue = new CachedBlockQueue(bytesToFree, blockSize); - totalSize = 0; - } - - public void add(CachedBlock block) { - totalSize += block.heapSize(); - queue.add(block); - } - - public long free(long toFree) { - CachedBlock[] blocks = queue.get(); - long freedBytes = 0; - for (int i = 0; i < blocks.length; i++) { - freedBytes += evictBlock(blocks[i]); - if (freedBytes >= toFree) { - return freedBytes; - } - } - return freedBytes; - } - - public long overflow() { - return totalSize - bucketSize; - } - - public long totalSize() { - return totalSize; - } - - @Override - public int compareTo(BlockBucket that) { - if (this.overflow() == that.overflow()) - return 0; - return this.overflow() > that.overflow() ? 1 : -1; - } - - @Override - public int hashCode() { - return Objects.hashCode(overflow()); - } - - @Override - public boolean equals(Object that) { - if (that instanceof BlockBucket) - return compareTo((BlockBucket) that) == 0; - return false; - } - } - - @Override - public long getMaxSize() { - return this.maxSize; - } - - /** - * Get the current size of this cache. - * - * @return current size in bytes - */ - public long getCurrentSize() { - return this.size.get(); - } - - /** - * Get the current size of this cache. - * - * @return current size in bytes - */ - public long getFreeSize() { - return getMaxSize() - getCurrentSize(); - } - - /** - * Get the size of this cache (number of cached blocks) - * - * @return number of cached blocks - */ - public long size() { - return this.elements.get(); - } - - /** - * Get the number of eviction runs that have occurred - */ - public long getEvictionCount() { - return this.stats.getEvictionCount(); - } - - /** - * Get the number of blocks that have been evicted during the lifetime of this cache. - */ - public long getEvictedCount() { - return this.stats.getEvictedCount(); - } - - /** - * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows above the acceptable level. - * - * <p> - * Thread is triggered into action by {@link LruBlockCache#runEviction()} - */ - private static class EvictionThread extends Thread { - private WeakReference<LruBlockCache> cache; - private boolean running = false; - - public EvictionThread(LruBlockCache cache) { - super("LruBlockCache.EvictionThread"); - setDaemon(true); - this.cache = new WeakReference<>(cache); - } - - public synchronized boolean running() { - return running; - } - - @Override - public void run() { - while (true) { - synchronized (this) { - running = true; - try { - this.wait(); - } catch (InterruptedException e) {} - } - LruBlockCache cache = this.cache.get(); - if (cache == null) - break; - cache.evict(); - } - } - - public void evict() { - synchronized (this) { - this.notify(); - } - } - } - - /* - * Statistics thread. Periodically prints the cache statistics to the log. - */ - private static class StatisticsThread extends Thread { - LruBlockCache lru; - - public StatisticsThread(LruBlockCache lru) { - super("LruBlockCache.StatisticsThread"); - setDaemon(true); - this.lru = lru; - } - - @Override - public void run() { - lru.logStats(); - } - } - - public void logStats() { - // Log size - long totalSize = heapSize(); - long freeSize = maxSize - totalSize; - float sizeMB = ((float) totalSize) / ((float) (1024 * 1024)); - float freeMB = ((float) freeSize) / ((float) (1024 * 1024)); - float maxMB = ((float) maxSize) / ((float) (1024 * 1024)); - log.debug("Cache Stats: Sizes: Total={}MB ({}), Free={}MB ({}), Max={}MB ({}), Counts: Blocks={}, Access={}, Hit={}, Miss={}, Evictions={}, Evicted={}," - + "Ratios: Hit Ratio={}%, Miss Ratio={}%, Evicted/Run={}, Duplicate Reads={}", sizeMB, totalSize, freeMB, freeSize, maxMB, maxSize, size(), - stats.requestCount(), stats.hitCount(), stats.getMissCount(), stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100, - stats.getMissRatio() * 100, stats.evictedPerEviction(), stats.getDuplicateReads()); - } - - /** - * Get counter statistics for this cache. - * - * <p> - * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes. - */ - public CacheStats getStats() { - return this.stats; - } - - public static class CacheStats implements BlockCache.Stats { - private final AtomicLong accessCount = new AtomicLong(0); - private final AtomicLong hitCount = new AtomicLong(0); - private final AtomicLong missCount = new AtomicLong(0); - private final AtomicLong evictionCount = new AtomicLong(0); - private final AtomicLong evictedCount = new AtomicLong(0); - private final AtomicLong duplicateReads = new AtomicLong(0); - - public void miss() { - missCount.incrementAndGet(); - accessCount.incrementAndGet(); - } - - public void hit() { - hitCount.incrementAndGet(); - accessCount.incrementAndGet(); - } - - public void evict() { - evictionCount.incrementAndGet(); - } - - public void duplicateReads() { - duplicateReads.incrementAndGet(); - } - - public void evicted() { - evictedCount.incrementAndGet(); - } - - @Override - public long requestCount() { - return accessCount.get(); - } - - public long getMissCount() { - return missCount.get(); - } - - @Override - public long hitCount() { - return hitCount.get(); - } - - public long getEvictionCount() { - return evictionCount.get(); - } - - public long getDuplicateReads() { - return duplicateReads.get(); - } - - public long getEvictedCount() { - return evictedCount.get(); - } - - public double getHitRatio() { - return ((float) hitCount() / (float) requestCount()); - } - - public double getMissRatio() { - return ((float) getMissCount() / (float) requestCount()); - } - - public double evictedPerEviction() { - return (float) getEvictedCount() / (float) getEvictionCount(); - } - } - - public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) - + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + ClassSize.OBJECT); - - // HeapSize implementation - @Override - public long heapSize() { - return getCurrentSize(); - } - - public static long calculateOverhead(long maxSize, long blockSize, int concurrency) { - return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) - + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); - } - - // Simple calculators of sizes given factors and maxSize - - private long acceptableSize() { - return (long) Math.floor(this.maxSize * this.acceptableFactor); - } - - private long minSize() { - return (long) Math.floor(this.maxSize * this.minFactor); - } - - private long singleSize() { - return (long) Math.floor(this.maxSize * this.singleFactor * this.minFactor); - } - - private long multiSize() { - return (long) Math.floor(this.maxSize * this.multiFactor * this.minFactor); - } - - private long memorySize() { - return (long) Math.floor(this.maxSize * this.memoryFactor * this.minFactor); - } - - public void shutdown() { - this.scheduleThreadPool.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java deleted file mode 100644 index ef2f664..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file.blockfile.cache; - -import static java.util.Objects.requireNonNull; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.Policy; -import com.github.benmanes.caffeine.cache.stats.CacheStats; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -/** - * A block cache that is memory bounded using the W-TinyLFU eviction algorithm. This implementation delegates to a Caffeine cache to provide concurrent O(1) - * read and write operations. - * <ul> - * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li> - * <li>Caffeine: https://github.com/ben-manes/caffeine</li> - * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li> - * </ul> - */ -public final class TinyLfuBlockCache implements BlockCache { - private static final Logger log = LoggerFactory.getLogger(TinyLfuBlockCache.class); - private static final int STATS_PERIOD_SEC = 60; - - private Cache<String,Block> cache; - private Policy.Eviction<String,Block> policy; - private ScheduledExecutorService statsExecutor; - - public void start(AccumuloConfiguration conf, long maxSize, long blockSize) { - cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * maxSize / blockSize)).weigher((String blockName, Block block) -> { - int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING; - return keyWeight + block.weight(); - }).maximumWeight(maxSize).recordStats().build(); - policy = cache.policy().eviction().get(); - - statsExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true) - .build()); - statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, TimeUnit.SECONDS); - } - - public void stop() {} - - @Override - public long getMaxSize() { - return policy.getMaximum(); - } - - @Override - public CacheEntry getBlock(String blockName) { - return cache.getIfPresent(blockName); - } - - @Override - public CacheEntry cacheBlock(String blockName, byte[] buffer) { - return cache.asMap().compute(blockName, (key, block) -> { - if (block == null) { - return new Block(buffer); - } - block.buffer = buffer; - return block; - }); - } - - @Override - public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored */boolean inMemory) { - return cacheBlock(blockName, buffer); - } - - @Override - public BlockCache.Stats getStats() { - CacheStats stats = cache.stats(); - return new BlockCache.Stats() { - @Override - public long hitCount() { - return stats.hitCount(); - } - - @Override - public long requestCount() { - return stats.requestCount(); - } - }; - } - - private void logStats() { - double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024)); - double sizeMB = ((double) policy.weightedSize().getAsLong()) / ((double) (1024 * 1024)); - double freeMB = maxMB - sizeMB; - log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, freeMB, maxMB, cache.estimatedSize()); - log.debug(cache.stats().toString()); - } - - private static final class Block implements CacheEntry { - private volatile byte[] buffer; - private volatile Object index; - - Block(byte[] buffer) { - this.buffer = requireNonNull(buffer); - } - - @Override - public byte[] getBuffer() { - return buffer; - } - - @Override - public Object getIndex() { - return index; - } - - @Override - public void setIndex(Object index) { - this.index = index; - } - - int weight() { - return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY; - } - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java new file mode 100644 index 0000000..349be7f --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java @@ -0,0 +1,637 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.lru; + +import java.lang.ref.WeakReference; +import java.util.Objects; +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; +import org.apache.accumulo.core.file.blockfile.cache.CachedBlock; +import org.apache.accumulo.core.file.blockfile.cache.CachedBlockQueue; +import org.apache.accumulo.core.file.blockfile.cache.ClassSize; +import org.apache.accumulo.core.file.blockfile.cache.HeapSize; +import org.apache.accumulo.core.file.blockfile.cache.SizeConstants; +import org.apache.accumulo.core.util.NamingThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A block cache implementation that is memory-aware using {@link HeapSize}, memory-bound using an LRU eviction algorithm, and concurrent: backed by a + * {@link ConcurrentHashMap} and with a non-blocking eviction thread giving constant-time {@link #cacheBlock} and {@link #getBlock} operations. + * + * <p> + * Contains three levels of block priority to allow for scan-resistance and in-memory families. A block is added with an inMemory flag if necessary, otherwise a + * block becomes a single access priority. Once a blocked is accessed again, it changes to multiple access. This is used to prevent scans from thrashing the + * cache, adding a least-frequently-used element to the eviction algorithm. + * + * <p> + * Each priority is given its own chunk of the total cache to ensure fairness during eviction. Each priority will retain close to its maximum size, however, if + * any priority is not using its entire chunk the others are able to grow beyond their chunk size. + * + * <p> + * Instantiated at a minimum with the total size and average block size. All sizes are in bytes. The block size is not especially important as this cache is + * fully dynamic in its sizing of blocks. It is only used for pre-allocating data structures and in initial heap estimation of the map. + * + * <p> + * The detailed constructor defines the sizes for the three priorities (they should total to the maximum size defined). It also sets the levels that trigger and + * control the eviction thread. + * + * <p> + * The acceptable size is the cache size level which triggers the eviction process to start. It evicts enough blocks to get the size below the minimum size + * specified. + * + * <p> + * Eviction happens in a separate thread and involves a single full-scan of the map. It determines how many bytes must be freed to reach the minimum size, and + * then while scanning determines the fewest least-recently-used blocks necessary from each of the three priorities (would be 3 times bytes to free). It then + * uses the priority chunk sizes to evict fairly according to the relative sizes and usage. + */ +public class LruBlockCache implements BlockCache, HeapSize { + + private static final Logger log = LoggerFactory.getLogger(LruBlockCache.class); + + /** Statistics thread */ + static final int statThreadPeriod = 60; + + /** Concurrent map (the cache) */ + private final ConcurrentHashMap<String,CachedBlock> map; + + /** Eviction lock (locked when eviction in process) */ + private final ReentrantLock evictionLock = new ReentrantLock(true); + + /** Volatile boolean to track if we are in an eviction process or not */ + private volatile boolean evictionInProgress = false; + + /** Eviction thread */ + private final EvictionThread evictionThread; + + /** Statistics thread schedule pool (for heavy debugging, could remove) */ + private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats")); + + /** Current size of cache */ + private AtomicLong size; + + /** Current number of cached elements */ + private AtomicLong elements; + + /** Cache access count (sequential ID) */ + private AtomicLong count; + + /** Cache statistics */ + private CacheStats stats; + + /** Overhead of the structure itself */ + private final long overhead; + + private final LruBlockCacheConfiguration conf; + + /** + * Default constructor. Specify maximum size and expected average block size (approximation is fine). + * + * <p> + * All other factors will be calculated based on defaults specified in this class. + * + * @param conf + * block cache configuration + * @param maxSize + * maximum size of cache, in bytes + * @param blockSize + * approximate size of each block, in bytes + */ + public LruBlockCache(final LruBlockCacheConfiguration conf) { + this.conf = conf; + + int mapInitialSize = (int) Math.ceil(1.2 * conf.getMaxSize() / conf.getBlockSize()); + + if (conf.getSingleFactor() + conf.getMultiFactor() + conf.getMemoryFactor() != 1) { + throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0"); + } + if (conf.getMinFactor() >= conf.getAcceptableFactor()) { + throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); + } + if (conf.getMinFactor() >= 1.0f || conf.getAcceptableFactor() >= 1.0f) { + throw new IllegalArgumentException("all factors must be < 1"); + } + map = new ConcurrentHashMap<>(mapInitialSize, conf.getMapLoadFactor(), conf.getMapConcurrencyLevel()); + this.stats = new CacheStats(); + this.count = new AtomicLong(0); + this.elements = new AtomicLong(0); + this.overhead = calculateOverhead(conf.getMaxSize(), conf.getBlockSize(), conf.getMapConcurrencyLevel()); + this.size = new AtomicLong(this.overhead); + + if (conf.isUseEvictionThread()) { + this.evictionThread = new EvictionThread(this); + this.evictionThread.start(); + while (!this.evictionThread.running()) { + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } else { + this.evictionThread = null; + } + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); + } + + public void start() {} + + public void stop() {} + + public long getOverhead() { + return overhead; + } + + // BlockCache implementation + + /** + * Cache the block with the specified name and buffer. + * <p> + * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a + * race condition and will update the buffer but not modify the size of the cache. + * + * @param blockName + * block name + * @param buf + * block buffer + * @param inMemory + * if block is in-memory + */ + @Override + public CacheEntry cacheBlock(String blockName, byte buf[], boolean inMemory) { + CachedBlock cb = map.get(blockName); + if (cb != null) { + stats.duplicateReads(); + cb.access(count.incrementAndGet()); + } else { + cb = new CachedBlock(blockName, buf, count.incrementAndGet(), inMemory); + CachedBlock currCb = map.putIfAbsent(blockName, cb); + if (currCb != null) { + stats.duplicateReads(); + cb = currCb; + cb.access(count.incrementAndGet()); + } else { + // Actually added block to cache + long newSize = size.addAndGet(cb.heapSize()); + elements.incrementAndGet(); + if (newSize > acceptableSize() && !evictionInProgress) { + runEviction(); + } + } + } + + return cb; + } + + /** + * Cache the block with the specified name and buffer. + * <p> + * It is assumed this will NEVER be called on an already cached block. If that is done, it is assumed that you are reinserting the same exact block due to a + * race condition and will update the buffer but not modify the size of the cache. + * + * @param blockName + * block name + * @param buf + * block buffer + */ + @Override + public CacheEntry cacheBlock(String blockName, byte buf[]) { + return cacheBlock(blockName, buf, false); + } + + /** + * Get the buffer of the block with the specified name. + * + * @param blockName + * block name + * @return buffer of specified block name, or null if not in cache + */ + @Override + public CachedBlock getBlock(String blockName) { + CachedBlock cb = map.get(blockName); + if (cb == null) { + stats.miss(); + return null; + } + stats.hit(); + cb.access(count.incrementAndGet()); + return cb; + } + + protected long evictBlock(CachedBlock block) { + map.remove(block.getName()); + size.addAndGet(-1 * block.heapSize()); + elements.decrementAndGet(); + stats.evicted(); + return block.heapSize(); + } + + /** + * Multi-threaded call to run the eviction process. + */ + private void runEviction() { + if (evictionThread == null) { + evict(); + } else { + evictionThread.evict(); + } + } + + /** + * Eviction method. + */ + void evict() { + + // Ensure only one eviction at a time + if (!evictionLock.tryLock()) + return; + + try { + evictionInProgress = true; + + long bytesToFree = size.get() - minSize(); + + log.trace("Block cache LRU eviction started. Attempting to free {} bytes", bytesToFree); + + if (bytesToFree <= 0) + return; + + // Instantiate priority buckets + BlockBucket bucketSingle = new BlockBucket(bytesToFree, conf.getBlockSize(), singleSize()); + BlockBucket bucketMulti = new BlockBucket(bytesToFree, conf.getBlockSize(), multiSize()); + BlockBucket bucketMemory = new BlockBucket(bytesToFree, conf.getBlockSize(), memorySize()); + + // Scan entire map putting into appropriate buckets + for (CachedBlock cachedBlock : map.values()) { + switch (cachedBlock.getPriority()) { + case SINGLE: { + bucketSingle.add(cachedBlock); + break; + } + case MULTI: { + bucketMulti.add(cachedBlock); + break; + } + case MEMORY: { + bucketMemory.add(cachedBlock); + break; + } + } + } + + PriorityQueue<BlockBucket> bucketQueue = new PriorityQueue<>(3); + + bucketQueue.add(bucketSingle); + bucketQueue.add(bucketMulti); + bucketQueue.add(bucketMemory); + + int remainingBuckets = 3; + long bytesFreed = 0; + + BlockBucket bucket; + while ((bucket = bucketQueue.poll()) != null) { + long overflow = bucket.overflow(); + if (overflow > 0) { + long bucketBytesToFree = Math.min(overflow, (long) Math.ceil((bytesToFree - bytesFreed) / (double) remainingBuckets)); + bytesFreed += bucket.free(bucketBytesToFree); + } + remainingBuckets--; + } + + float singleMB = ((float) bucketSingle.totalSize()) / ((float) (1024 * 1024)); + float multiMB = ((float) bucketMulti.totalSize()) / ((float) (1024 * 1024)); + float memoryMB = ((float) bucketMemory.totalSize()) / ((float) (1024 * 1024)); + + log.trace("Block cache LRU eviction completed. Freed {} bytes. Priority Sizes: Single={}MB ({}), Multi={}MB ({}), Memory={}MB ({})", bytesFreed, + singleMB, bucketSingle.totalSize(), multiMB, bucketMulti.totalSize(), memoryMB, bucketMemory.totalSize()); + + } finally { + stats.evict(); + evictionInProgress = false; + evictionLock.unlock(); + } + } + + /** + * Used to group blocks into priority buckets. There will be a BlockBucket for each priority (single, multi, memory). Once bucketed, the eviction algorithm + * takes the appropriate number of elements out of each according to configuration parameters and their relatives sizes. + */ + private class BlockBucket implements Comparable<BlockBucket> { + + private CachedBlockQueue queue; + private long totalSize = 0; + private long bucketSize; + + public BlockBucket(long bytesToFree, long blockSize, long bucketSize) { + this.bucketSize = bucketSize; + queue = new CachedBlockQueue(bytesToFree, blockSize); + totalSize = 0; + } + + public void add(CachedBlock block) { + totalSize += block.heapSize(); + queue.add(block); + } + + public long free(long toFree) { + CachedBlock[] blocks = queue.get(); + long freedBytes = 0; + for (int i = 0; i < blocks.length; i++) { + freedBytes += evictBlock(blocks[i]); + if (freedBytes >= toFree) { + return freedBytes; + } + } + return freedBytes; + } + + public long overflow() { + return totalSize - bucketSize; + } + + public long totalSize() { + return totalSize; + } + + @Override + public int compareTo(BlockBucket that) { + if (this.overflow() == that.overflow()) + return 0; + return this.overflow() > that.overflow() ? 1 : -1; + } + + @Override + public int hashCode() { + return Objects.hashCode(overflow()); + } + + @Override + public boolean equals(Object that) { + if (that instanceof BlockBucket) + return compareTo((BlockBucket) that) == 0; + return false; + } + } + + @Override + public long getMaxSize() { + return this.conf.getMaxSize(); + } + + /** + * Get the current size of this cache. + * + * @return current size in bytes + */ + public long getCurrentSize() { + return this.size.get(); + } + + /** + * Get the current size of this cache. + * + * @return current size in bytes + */ + public long getFreeSize() { + return getMaxSize() - getCurrentSize(); + } + + /** + * Get the size of this cache (number of cached blocks) + * + * @return number of cached blocks + */ + public long size() { + return this.elements.get(); + } + + /** + * Get the number of eviction runs that have occurred + */ + public long getEvictionCount() { + return this.stats.getEvictionCount(); + } + + /** + * Get the number of blocks that have been evicted during the lifetime of this cache. + */ + public long getEvictedCount() { + return this.stats.getEvictedCount(); + } + + /** + * Eviction thread. Sits in waiting state until an eviction is triggered when the cache size grows above the acceptable level. + * + * <p> + * Thread is triggered into action by {@link LruBlockCache#runEviction()} + */ + private static class EvictionThread extends Thread { + private WeakReference<LruBlockCache> cache; + private boolean running = false; + + public EvictionThread(LruBlockCache cache) { + super("LruBlockCache.EvictionThread"); + setDaemon(true); + this.cache = new WeakReference<>(cache); + } + + public synchronized boolean running() { + return running; + } + + @Override + public void run() { + while (true) { + synchronized (this) { + running = true; + try { + this.wait(); + } catch (InterruptedException e) {} + } + LruBlockCache cache = this.cache.get(); + if (cache == null) + break; + cache.evict(); + } + } + + public void evict() { + synchronized (this) { + this.notify(); + } + } + } + + /* + * Statistics thread. Periodically prints the cache statistics to the log. + */ + private static class StatisticsThread extends Thread { + LruBlockCache lru; + + public StatisticsThread(LruBlockCache lru) { + super("LruBlockCache.StatisticsThread"); + setDaemon(true); + this.lru = lru; + } + + @Override + public void run() { + lru.logStats(); + } + } + + public void logStats() { + // Log size + long totalSize = heapSize(); + long freeSize = this.conf.getMaxSize() - totalSize; + float sizeMB = ((float) totalSize) / ((float) (1024 * 1024)); + float freeMB = ((float) freeSize) / ((float) (1024 * 1024)); + float maxMB = ((float) this.conf.getMaxSize()) / ((float) (1024 * 1024)); + log.debug("Cache Stats: Sizes: Total={}MB ({}), Free={}MB ({}), Max={}MB ({}), Counts: Blocks={}, Access={}, Hit={}, Miss={}, Evictions={}, Evicted={}," + + "Ratios: Hit Ratio={}%, Miss Ratio={}%, Evicted/Run={}, Duplicate Reads={}", sizeMB, totalSize, freeMB, freeSize, maxMB, this.conf.getMaxSize(), + size(), stats.requestCount(), stats.hitCount(), stats.getMissCount(), stats.getEvictionCount(), stats.getEvictedCount(), stats.getHitRatio() * 100, + stats.getMissRatio() * 100, stats.evictedPerEviction(), stats.getDuplicateReads()); + } + + /** + * Get counter statistics for this cache. + * + * <p> + * Includes: total accesses, hits, misses, evicted blocks, and runs of the eviction processes. + */ + public CacheStats getStats() { + return this.stats; + } + + public static class CacheStats implements BlockCache.Stats { + private final AtomicLong accessCount = new AtomicLong(0); + private final AtomicLong hitCount = new AtomicLong(0); + private final AtomicLong missCount = new AtomicLong(0); + private final AtomicLong evictionCount = new AtomicLong(0); + private final AtomicLong evictedCount = new AtomicLong(0); + private final AtomicLong duplicateReads = new AtomicLong(0); + + public void miss() { + missCount.incrementAndGet(); + accessCount.incrementAndGet(); + } + + public void hit() { + hitCount.incrementAndGet(); + accessCount.incrementAndGet(); + } + + public void evict() { + evictionCount.incrementAndGet(); + } + + public void duplicateReads() { + duplicateReads.incrementAndGet(); + } + + public void evicted() { + evictedCount.incrementAndGet(); + } + + @Override + public long requestCount() { + return accessCount.get(); + } + + public long getMissCount() { + return missCount.get(); + } + + @Override + public long hitCount() { + return hitCount.get(); + } + + public long getEvictionCount() { + return evictionCount.get(); + } + + public long getDuplicateReads() { + return duplicateReads.get(); + } + + public long getEvictedCount() { + return evictedCount.get(); + } + + public double getHitRatio() { + return ((float) hitCount() / (float) requestCount()); + } + + public double getMissRatio() { + return ((float) getMissCount() / (float) requestCount()); + } + + public double evictedPerEviction() { + return (float) getEvictedCount() / (float) getEvictionCount(); + } + } + + public final static long CACHE_FIXED_OVERHEAD = ClassSize.align((3 * SizeConstants.SIZEOF_LONG) + (8 * ClassSize.REFERENCE) + + (5 * SizeConstants.SIZEOF_FLOAT) + SizeConstants.SIZEOF_BOOLEAN + ClassSize.OBJECT); + + // HeapSize implementation + @Override + public long heapSize() { + return getCurrentSize(); + } + + public static long calculateOverhead(long maxSize, long blockSize, int concurrency) { + return CACHE_FIXED_OVERHEAD + ClassSize.CONCURRENT_HASHMAP + ((int) Math.ceil(maxSize * 1.2 / blockSize) * ClassSize.CONCURRENT_HASHMAP_ENTRY) + + (concurrency * ClassSize.CONCURRENT_HASHMAP_SEGMENT); + } + + // Simple calculators of sizes given factors and maxSize + + private long acceptableSize() { + return (long) Math.floor(this.conf.getMaxSize() * this.conf.getAcceptableFactor()); + } + + private long minSize() { + return (long) Math.floor(this.conf.getMaxSize() * this.conf.getMinFactor()); + } + + private long singleSize() { + return (long) Math.floor(this.conf.getMaxSize() * this.conf.getSingleFactor() * this.conf.getMinFactor()); + } + + private long multiSize() { + return (long) Math.floor(this.conf.getMaxSize() * this.conf.getMultiFactor() * this.conf.getMinFactor()); + } + + private long memorySize() { + return (long) Math.floor(this.conf.getMaxSize() * this.conf.getMemoryFactor() * this.conf.getMinFactor()); + } + + public void shutdown() { + this.scheduleThreadPool.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java new file mode 100644 index 0000000..b7fb472 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.lru; + +import java.util.Map; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; + +public final class LruBlockCacheConfiguration extends BlockCacheConfiguration { + + /** Default Configuration Parameters */ + + /** Backing Concurrent Map Configuration */ + public static final Float DEFAULT_LOAD_FACTOR = 0.75f; + public static final Integer DEFAULT_CONCURRENCY_LEVEL = 16; + + /** Eviction thresholds */ + public static final Float DEFAULT_MIN_FACTOR = 0.75f; + public static final Float DEFAULT_ACCEPTABLE_FACTOR = 0.85f; + + /** Priority buckets */ + public static final Float DEFAULT_SINGLE_FACTOR = 0.25f; + public static final Float DEFAULT_MULTI_FACTOR = 0.50f; + public static final Float DEFAULT_MEMORY_FACTOR = 0.25f; + + // property names + private static final String PREFIX = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block.lru."; + public static final String ACCEPTABLE_FACTOR_PROPERTY = PREFIX + "acceptable.factor"; + public static final String MIN_FACTOR_PROPERTY = PREFIX + "min.factor"; + public static final String SINGLE_FACTOR_PROPERTY = PREFIX + "single.factor"; + public static final String MULTI_FACTOR_PROPERTY = PREFIX + "multi.factor"; + public static final String MEMORY_FACTOR_PROPERTY = PREFIX + "memory.factor"; + public static final String MAP_LOAD_PROPERTY = PREFIX + "map.load"; + public static final String MAP_CONCURRENCY_PROPERTY = PREFIX + "map.concurrency"; + public static final String EVICTION_THREAD_PROPERTY = PREFIX + "eviction.thread"; + + /** Acceptable size of cache (no evictions if size < acceptable) */ + private final float acceptableFactor; + + /** Minimum threshold of cache (when evicting, evict until size < min) */ + private final float minFactor; + + /** Single access bucket size */ + private final float singleFactor; + + /** Multiple access bucket size */ + private final float multiFactor; + + /** In-memory bucket size */ + private final float memoryFactor; + + /** LruBlockCache cache = new LruBlockCache **/ + private final float mapLoadFactor; + + /** LruBlockCache cache = new LruBlockCache **/ + private final int mapConcurrencyLevel; + + private final boolean useEvictionThread; + + public LruBlockCacheConfiguration(AccumuloConfiguration conf) { + super(conf); + Map<String,String> props = conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX); + this.acceptableFactor = getOrDefault(props, ACCEPTABLE_FACTOR_PROPERTY, DEFAULT_ACCEPTABLE_FACTOR); + this.minFactor = getOrDefault(props, MIN_FACTOR_PROPERTY, DEFAULT_MIN_FACTOR); + this.singleFactor = getOrDefault(props, SINGLE_FACTOR_PROPERTY, DEFAULT_SINGLE_FACTOR); + this.multiFactor = getOrDefault(props, MULTI_FACTOR_PROPERTY, DEFAULT_MULTI_FACTOR); + this.memoryFactor = getOrDefault(props, MEMORY_FACTOR_PROPERTY, DEFAULT_MEMORY_FACTOR); + this.mapLoadFactor = getOrDefault(props, MAP_LOAD_PROPERTY, DEFAULT_LOAD_FACTOR); + this.mapConcurrencyLevel = getOrDefault(props, MAP_CONCURRENCY_PROPERTY, DEFAULT_CONCURRENCY_LEVEL); + this.useEvictionThread = getOrDefault(props, EVICTION_THREAD_PROPERTY, Boolean.TRUE); + } + + public float getAcceptableFactor() { + return acceptableFactor; + } + + public float getMinFactor() { + return minFactor; + } + + public float getSingleFactor() { + return singleFactor; + } + + public float getMultiFactor() { + return multiFactor; + } + + public float getMemoryFactor() { + return memoryFactor; + } + + public float getMapLoadFactor() { + return mapLoadFactor; + } + + public int getMapConcurrencyLevel() { + return mapConcurrencyLevel; + } + + public boolean isUseEvictionThread() { + return useEvictionThread; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java new file mode 100644 index 0000000..b923b8e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.lru; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory; + +public class LruBlockCacheFactory extends BlockCacheFactory { + + @Override + public BlockCache getBlockCache(AccumuloConfiguration conf) { + return new LruBlockCache(new LruBlockCacheConfiguration(conf)); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java new file mode 100644 index 0000000..fccd55c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.tinylfu; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; +import org.apache.accumulo.core.file.blockfile.cache.ClassSize; +import org.apache.accumulo.core.file.blockfile.cache.SizeConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A block cache that is memory bounded using the W-TinyLFU eviction algorithm. This implementation delegates to a Caffeine cache to provide concurrent O(1) + * read and write operations. + * <ul> + * <li>W-TinyLFU: http://arxiv.org/pdf/1512.00727.pdf</li> + * <li>Caffeine: https://github.com/ben-manes/caffeine</li> + * <li>Cache design: http://highscalability.com/blog/2016/1/25/design-of-a-modern-cache.html</li> + * </ul> + */ +public final class TinyLfuBlockCache implements BlockCache { + private static final Logger log = LoggerFactory.getLogger(TinyLfuBlockCache.class); + private static final int STATS_PERIOD_SEC = 60; + + private Cache<String,Block> cache; + private Policy.Eviction<String,Block> policy; + private ScheduledExecutorService statsExecutor; + + public TinyLfuBlockCache(TinyLfuBlockCacheConfiguration conf) { + cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * conf.getMaxSize() / conf.getBlockSize())).weigher((String blockName, Block block) -> { + int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING; + return keyWeight + block.weight(); + }).maximumWeight(conf.getMaxSize()).recordStats().build(); + policy = cache.policy().eviction().get(); + statsExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("TinyLfuBlockCacheStatsExecutor").setDaemon(true) + .build()); + statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, TimeUnit.SECONDS); + + } + + public void start() {} + + public void stop() {} + + @Override + public long getMaxSize() { + return policy.getMaximum(); + } + + @Override + public CacheEntry getBlock(String blockName) { + return cache.getIfPresent(blockName); + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buffer) { + return cache.asMap().compute(blockName, (key, block) -> { + if (block == null) { + return new Block(buffer); + } + block.buffer = buffer; + return block; + }); + } + + @Override + public CacheEntry cacheBlock(String blockName, byte[] buffer, /* ignored */boolean inMemory) { + return cacheBlock(blockName, buffer); + } + + @Override + public BlockCache.Stats getStats() { + CacheStats stats = cache.stats(); + return new BlockCache.Stats() { + @Override + public long hitCount() { + return stats.hitCount(); + } + + @Override + public long requestCount() { + return stats.requestCount(); + } + }; + } + + private void logStats() { + double maxMB = ((double) policy.getMaximum()) / ((double) (1024 * 1024)); + double sizeMB = ((double) policy.weightedSize().getAsLong()) / ((double) (1024 * 1024)); + double freeMB = maxMB - sizeMB; + log.debug("Cache Size={}MB, Free={}MB, Max={}MB, Blocks={}", sizeMB, freeMB, maxMB, cache.estimatedSize()); + log.debug(cache.stats().toString()); + } + + private static final class Block implements CacheEntry { + private volatile byte[] buffer; + private volatile Object index; + + Block(byte[] buffer) { + this.buffer = requireNonNull(buffer); + } + + @Override + public byte[] getBuffer() { + return buffer; + } + + @Override + public Object getIndex() { + return index; + } + + @Override + public void setIndex(Object index) { + this.index = index; + } + + int weight() { + return ClassSize.align(buffer.length) + SizeConstants.SIZEOF_LONG + ClassSize.REFERENCE + ClassSize.OBJECT + ClassSize.ARRAY; + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java new file mode 100644 index 0000000..3d1efa5 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.tinylfu; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; + +public final class TinyLfuBlockCacheConfiguration extends BlockCacheConfiguration { + + public TinyLfuBlockCacheConfiguration(AccumuloConfiguration conf) { + super(conf); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java new file mode 100644 index 0000000..33db576 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.tinylfu; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory; + +public class TinyLfuBlockCacheFactory extends BlockCacheFactory { + + @Override + public BlockCache getBlockCache(AccumuloConfiguration conf) { + return new TinyLfuBlockCache(new TinyLfuBlockCacheConfiguration(conf)); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/40c1cb0b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index 73cabf2..ab98816 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -91,7 +91,7 @@ public class SummaryReader { } @Override - public void start(AccumuloConfiguration conf, long maxSize, long blockSize) throws Exception {} + public void start() {} @Override public void stop() {}
