Repository: accumulo Updated Branches: refs/heads/ACCUMULO-4463 [created] 03a8f2acf
ACCUMULO-4463: Make block cache implentation configurable Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/03a8f2ac Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/03a8f2ac Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/03a8f2ac Branch: refs/heads/ACCUMULO-4463 Commit: 03a8f2acfb4f95b545c47a4e681dd5b8fa585c94 Parents: 2e171cd Author: Dave Marion <[email protected]> Authored: Mon May 8 11:47:53 2017 -0400 Committer: Dave Marion <[email protected]> Committed: Mon May 8 11:47:53 2017 -0400 ---------------------------------------------------------------------- .../core/client/rfile/RFileScanner.java | 20 ++- .../org/apache/accumulo/core/conf/Property.java | 3 +- .../core/file/blockfile/cache/BlockCache.java | 20 +++ .../file/blockfile/cache/BlockCacheFactory.java | 32 ++++ .../file/blockfile/cache/LruBlockCache.java | 155 ++++++++++++------- .../file/blockfile/cache/TinyLfuBlockCache.java | 11 +- .../accumulo/core/summary/SummaryReader.java | 6 + .../file/blockfile/cache/TestLruBlockCache.java | 76 +++++---- .../accumulo/core/file/rfile/RFileTest.java | 6 +- .../tserver/TabletServerResourceManager.java | 27 ++-- 10 files changed, 251 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 1b12fb6..57322a8 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -127,6 +127,12 @@ class RFileScanner extends ScannerOptions implements Scanner { } }; } + + @Override + public void start(AccumuloConfiguration conf, long maxSize, long blockSize) throws Exception {} + + @Override + public void stop() {} } RFileScanner(Opts opts) { @@ -136,13 +142,23 @@ class RFileScanner extends ScannerOptions implements Scanner { this.opts = opts; if (opts.indexCacheSize > 0) { - this.indexCache = new LruBlockCache(opts.indexCacheSize, CACHE_BLOCK_SIZE); + this.indexCache = new LruBlockCache(); + try { + this.indexCache.start((AccumuloConfiguration) null, opts.indexCacheSize, CACHE_BLOCK_SIZE); + } catch (Exception e) { + throw new RuntimeException("Error starting cache", e); + } } else { this.indexCache = new NoopCache(); } if (opts.dataCacheSize > 0) { - this.dataCache = new LruBlockCache(opts.dataCacheSize, CACHE_BLOCK_SIZE); + this.dataCache = new LruBlockCache(); + try { + this.dataCache.start((AccumuloConfiguration) null, opts.dataCacheSize, CACHE_BLOCK_SIZE); + } catch (Exception e) { + throw new RuntimeException("Error starting cache", e); + } } else { this.dataCache = new NoopCache(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 5480867..0bbaf10 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -245,7 +245,8 @@ public enum Property { TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"), TSERV_CLIENT_TIMEOUT("tserver.client.timeout", "3s", PropertyType.TIMEDURATION, "Time to wait for clients to continue scans before closing a session."), TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES, "Specifies a default blocksize for the tserver caches"), - TSERV_CACHE_POLICY("tserver.cache.policy", "LRU", PropertyType.STRING, "Specifies the eviction policy of the file data caches (LRU or TinyLFU)."), + TSERV_CACHE_IMPL("tserver.cache.class", "org.apache.accumulo.core.file.blockfile.cache.LruBlockCache.class", PropertyType.STRING, + "Specifies the class name of the block cache implementation."), TSERV_DATACACHE_SIZE("tserver.cache.data.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for file data blocks."), TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "25%", PropertyType.MEMORY, "Specifies the size of the cache for file indices."), TSERV_SUMMARYCACHE_SIZE("tserver.cache.summary.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for summary data on each tablet server."), http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java index 82f8b1e..c5a17e4 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java @@ -17,10 +17,30 @@ */ package org.apache.accumulo.core.file.blockfile.cache; +import org.apache.accumulo.core.conf.AccumuloConfiguration; + /** * Block cache interface. */ public interface BlockCache { + + /** + * Start the block cache + * + * @param conf + * Accumulo configuration object + * @param maxSize + * maximum size of the on-heap cache + * @param blockSize + * size of the default RFile blocks + */ + void start(AccumuloConfiguration conf, long maxSize, long blockSize) throws Exception; + + /** + * Stop the block cache and release resources + */ + void stop(); + /** * Add block to cache. * http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java new file mode 100644 index 0000000..70cbc7d --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; + +public class BlockCacheFactory { + + public static BlockCache getBlockCache(AccumuloConfiguration conf) throws Exception { + String impl = conf.get(Property.TSERV_CACHE_IMPL); + Class<? extends BlockCache> clazz = AccumuloVFSClassLoader.loadClass(impl, BlockCache.class); + return clazz.newInstance(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java index cbdaca5..921b5a5 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.util.NamingThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +85,7 @@ public class LruBlockCache implements BlockCache, HeapSize { static final int statThreadPeriod = 60; /** Concurrent map (the cache) */ - private final ConcurrentHashMap<String,CachedBlock> map; + private ConcurrentHashMap<String,CachedBlock> map; /** Eviction lock (locked when eviction in process) */ private final ReentrantLock evictionLock = new ReentrantLock(true); @@ -93,22 +94,22 @@ public class LruBlockCache implements BlockCache, HeapSize { private volatile boolean evictionInProgress = false; /** Eviction thread */ - private final EvictionThread evictionThread; + private EvictionThread evictionThread; /** Statistics thread schedule pool (for heavy debugging, could remove) */ private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats")); /** Current size of cache */ - private final AtomicLong size; + private AtomicLong size; /** Current number of cached elements */ - private final AtomicLong elements; + private AtomicLong elements; /** Cache access count (sequential ID) */ - private final AtomicLong count; + private AtomicLong count; /** Cache statistics */ - private final CacheStats stats; + private CacheStats stats; /** Maximum allowable size of cache (block put if size > max, evict) */ private long maxSize; @@ -117,74 +118,47 @@ public class LruBlockCache implements BlockCache, HeapSize { private long blockSize; /** Acceptable size of cache (no evictions if size < acceptable) */ - private float acceptableFactor; + private float acceptableFactor = DEFAULT_ACCEPTABLE_FACTOR; /** Minimum threshold of cache (when evicting, evict until size < min) */ - private float minFactor; + private float minFactor = DEFAULT_MIN_FACTOR; /** Single access bucket size */ - private float singleFactor; + private float singleFactor = DEFAULT_SINGLE_FACTOR; /** Multiple access bucket size */ - private float multiFactor; + private float multiFactor = DEFAULT_MULTI_FACTOR; /** In-memory bucket size */ - private float memoryFactor; + private float memoryFactor = DEFAULT_MEMORY_FACTOR; + + /** LruBlockCache cache = new LruBlockCache **/ + private float mapLoadFactor = DEFAULT_LOAD_FACTOR; + + /** LruBlockCache cache = new LruBlockCache **/ + private int mapConcurrencyLevel = DEFAULT_CONCURRENCY_LEVEL; /** Overhead of the structure itself */ private long overhead; + private boolean useEvictionThread = true; + /** * Default constructor. Specify maximum size and expected average block size (approximation is fine). * * <p> * All other factors will be calculated based on defaults specified in this class. * + * @param conf + * accumulo configuration * @param maxSize * maximum size of cache, in bytes * @param blockSize * approximate size of each block, in bytes */ - public LruBlockCache(long maxSize, long blockSize) { - this(maxSize, blockSize, true); - } - - /** - * Constructor used for testing. Allows disabling of the eviction thread. - */ - public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) { - this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / blockSize), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR, - DEFAULT_ACCEPTABLE_FACTOR, DEFAULT_SINGLE_FACTOR, DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR); - } + public void start(AccumuloConfiguration conf, long maxSize, long blockSize) { + int mapInitialSize = (int) Math.ceil(1.2 * maxSize / blockSize); - /** - * Configurable constructor. Use this constructor if not using defaults. - * - * @param maxSize - * maximum size of this cache, in bytes - * @param blockSize - * expected average size of blocks, in bytes - * @param evictionThread - * whether to run evictions in a bg thread or not - * @param mapInitialSize - * initial size of backing ConcurrentHashMap - * @param mapLoadFactor - * initial load factor of backing ConcurrentHashMap - * @param mapConcurrencyLevel - * initial concurrency factor for backing CHM - * @param minFactor - * percentage of total size that eviction will evict until - * @param acceptableFactor - * percentage of total size that triggers eviction - * @param singleFactor - * percentage of total size for single-access blocks - * @param multiFactor - * percentage of total size for multiple-access blocks - * @param memoryFactor - * percentage of total size for in-memory blocks - */ - public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float minFactor, - float acceptableFactor, float singleFactor, float multiFactor, float memoryFactor) { if (singleFactor + multiFactor + memoryFactor != 1) { throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0"); } @@ -197,18 +171,13 @@ public class LruBlockCache implements BlockCache, HeapSize { this.maxSize = maxSize; this.blockSize = blockSize; map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, mapConcurrencyLevel); - this.minFactor = minFactor; - this.acceptableFactor = acceptableFactor; - this.singleFactor = singleFactor; - this.multiFactor = multiFactor; - this.memoryFactor = memoryFactor; this.stats = new CacheStats(); this.count = new AtomicLong(0); this.elements = new AtomicLong(0); this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); this.size = new AtomicLong(this.overhead); - if (evictionThread) { + if (useEvictionThread) { this.evictionThread = new EvictionThread(this); this.evictionThread.start(); while (!this.evictionThread.running()) { @@ -224,6 +193,80 @@ public class LruBlockCache implements BlockCache, HeapSize { this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); } + public void stop() {} + + public float getMinFactor() { + return minFactor; + } + + public float getSingleFactor() { + return singleFactor; + } + + public float getMultiFactor() { + return multiFactor; + } + + public float getMemoryFactor() { + return memoryFactor; + } + + public float getMapLoadFactor() { + return mapLoadFactor; + } + + public int getMapConcurrencyLevel() { + return mapConcurrencyLevel; + } + + public long getOverhead() { + return overhead; + } + + public boolean isUseEvictionThread() { + return useEvictionThread; + } + + public void setMinFactor(float minFactor) { + this.minFactor = minFactor; + } + + public void setSingleFactor(float singleFactor) { + this.singleFactor = singleFactor; + } + + public void setMultiFactor(float multiFactor) { + this.multiFactor = multiFactor; + } + + public void setMemoryFactor(float memoryFactor) { + this.memoryFactor = memoryFactor; + } + + public void setMapLoadFactor(float mapLoadFactor) { + this.mapLoadFactor = mapLoadFactor; + } + + public void setMapConcurrencyLevel(int mapConcurrencyLevel) { + this.mapConcurrencyLevel = mapConcurrencyLevel; + } + + public void setOverhead(long overhead) { + this.overhead = overhead; + } + + public void setUseEvictionThread(boolean useEvictionThread) { + this.useEvictionThread = useEvictionThread; + } + + public float getAcceptableFactor() { + return acceptableFactor; + } + + public void setAcceptableFactor(float acceptableFactor) { + this.acceptableFactor = acceptableFactor; + } + public void setMaxSize(long maxSize) { this.maxSize = maxSize; if (this.size.get() > acceptableSize() && !evictionInProgress) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java index bab52af..ef2f664 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java @@ -23,6 +23,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,11 +46,11 @@ public final class TinyLfuBlockCache implements BlockCache { private static final Logger log = LoggerFactory.getLogger(TinyLfuBlockCache.class); private static final int STATS_PERIOD_SEC = 60; - private final Cache<String,Block> cache; - private final Policy.Eviction<String,Block> policy; - private final ScheduledExecutorService statsExecutor; + private Cache<String,Block> cache; + private Policy.Eviction<String,Block> policy; + private ScheduledExecutorService statsExecutor; - public TinyLfuBlockCache(long maxSize, long blockSize) { + public void start(AccumuloConfiguration conf, long maxSize, long blockSize) { cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * maxSize / blockSize)).weigher((String blockName, Block block) -> { int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING; return keyWeight + block.weight(); @@ -61,6 +62,8 @@ public final class TinyLfuBlockCache implements BlockCache { statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, STATS_PERIOD_SEC, TimeUnit.SECONDS); } + public void stop() {} + @Override public long getMaxSize() { return policy.getMaximum(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index 9b2b5d9..73cabf2 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -89,6 +89,12 @@ public class SummaryReader { public Stats getStats() { return summaryCache.getStats(); } + + @Override + public void start(AccumuloConfiguration conf, long maxSize, long blockSize) throws Exception {} + + @Override + public void stop() {} } private static List<SummarySerializer> load(BlockReader bcReader, Predicate<SummarizerConfiguration> summarySelector) throws IOException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java index a5ab14a..b5624c9 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java @@ -22,6 +22,8 @@ import java.util.Random; import junit.framework.TestCase; +import org.apache.accumulo.core.conf.AccumuloConfiguration; + /** * Tests the concurrent LruBlockCache. * <p> @@ -36,7 +38,8 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSizeDefault(maxSize, 9); // room for 9, will evict - LruBlockCache cache = new LruBlockCache(maxSize, blockSize); + LruBlockCache cache = new LruBlockCache(); + cache.start((AccumuloConfiguration) null, maxSize, blockSize); Block[] blocks = generateFixedBlocks(10, blockSize, "block"); @@ -60,7 +63,8 @@ public class TestLruBlockCache extends TestCase { long maxSize = 1000000; long blockSize = calculateBlockSizeDefault(maxSize, 101); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize); + LruBlockCache cache = new LruBlockCache(); + cache.start((AccumuloConfiguration) null, maxSize, blockSize); Block[] blocks = generateRandomBlocks(100, blockSize); @@ -109,7 +113,9 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSizeDefault(maxSize, 10); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false); + LruBlockCache cache = new LruBlockCache(); + cache.setUseEvictionThread(false); + cache.start((AccumuloConfiguration) null, maxSize, blockSize); Block[] blocks = generateFixedBlocks(10, blockSize, "block"); @@ -146,12 +152,16 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSizeDefault(maxSize, 10); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR, - LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min - 0.99f, // acceptable - 0.25f, // single - 0.50f, // multi - 0.25f);// memory + LruBlockCache cache = new LruBlockCache(); + cache.setUseEvictionThread(false); + cache.setMapLoadFactor(LruBlockCache.DEFAULT_LOAD_FACTOR); + cache.setMapConcurrencyLevel(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL); + cache.setMinFactor(0.98f); + cache.setAcceptableFactor(0.99f); + cache.setSingleFactor(0.25f); + cache.setMultiFactor(0.50f); + cache.setMemoryFactor(0.25f); + cache.start((AccumuloConfiguration) null, maxSize, blockSize); Block[] singleBlocks = generateFixedBlocks(5, 10000, "single"); Block[] multiBlocks = generateFixedBlocks(5, 10000, "multi"); @@ -205,12 +215,16 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSize(maxSize, 10); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR, - LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min - 0.99f, // acceptable - 0.33f, // single - 0.33f, // multi - 0.34f);// memory + LruBlockCache cache = new LruBlockCache(); + cache.setUseEvictionThread(false); + cache.setMapLoadFactor(LruBlockCache.DEFAULT_LOAD_FACTOR); + cache.setMapConcurrencyLevel(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL); + cache.setMinFactor(0.98f); + cache.setAcceptableFactor(0.99f); + cache.setSingleFactor(0.33f); + cache.setMultiFactor(0.33f); + cache.setMemoryFactor(0.34f); + cache.start((AccumuloConfiguration) null, maxSize, blockSize); Block[] singleBlocks = generateFixedBlocks(5, blockSize, "single"); Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); @@ -323,12 +337,16 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSize(maxSize, 10); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR, - LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.66f, // min - 0.99f, // acceptable - 0.33f, // single - 0.33f, // multi - 0.34f);// memory + LruBlockCache cache = new LruBlockCache(); + cache.setUseEvictionThread(false); + cache.setMapLoadFactor(LruBlockCache.DEFAULT_LOAD_FACTOR); + cache.setMapConcurrencyLevel(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL); + cache.setMinFactor(0.66f); + cache.setAcceptableFactor(0.99f); + cache.setSingleFactor(0.33f); + cache.setMultiFactor(0.33f); + cache.setMemoryFactor(0.34f); + cache.start((AccumuloConfiguration) null, maxSize, blockSize); Block[] singleBlocks = generateFixedBlocks(20, blockSize, "single"); Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); @@ -382,12 +400,16 @@ public class TestLruBlockCache extends TestCase { long maxSize = 300000; long blockSize = calculateBlockSize(maxSize, 31); - LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR, - LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min - 0.99f, // acceptable - 0.33f, // single - 0.33f, // multi - 0.34f);// memory + LruBlockCache cache = new LruBlockCache(); + cache.setUseEvictionThread(false); + cache.setMapLoadFactor(LruBlockCache.DEFAULT_LOAD_FACTOR); + cache.setMapConcurrencyLevel(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL); + cache.setMinFactor(0.98f); + cache.setAcceptableFactor(0.99f); + cache.setSingleFactor(0.33f); + cache.setMultiFactor(0.33f); + cache.setMemoryFactor(0.34f); + cache.start((AccumuloConfiguration) null, maxSize, blockSize); Block[] singleBlocks = generateFixedBlocks(10, blockSize, "single"); Block[] multiBlocks = generateFixedBlocks(10, blockSize, "multi"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index fc43ef1..5cec5eb 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -264,8 +264,10 @@ public class RFileTest { in = new FSDataInputStream(bais); fileLength = data.length; - LruBlockCache indexCache = new LruBlockCache(100000000, 100000); - LruBlockCache dataCache = new LruBlockCache(100000000, 100000); + LruBlockCache indexCache = new LruBlockCache(); + indexCache.start((AccumuloConfiguration) null, 100000000, 100000); + LruBlockCache dataCache = new LruBlockCache(); + dataCache.start((AccumuloConfiguration) null, 100000000, 100000); CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf, dataCache, indexCache, AccumuloConfiguration.getDefaultConfiguration()); reader = new RFile.Reader(_cbr); http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index b84997d..476a4ca 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -40,8 +40,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; -import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache; -import org.apache.accumulo.core.file.blockfile.cache.TinyLfuBlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.NamingThreadFactory; @@ -175,17 +174,15 @@ public class TabletServerResourceManager { long sCacheSize = acuConf.getAsBytes(Property.TSERV_SUMMARYCACHE_SIZE); long totalQueueSize = acuConf.getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX); - String policy = acuConf.get(Property.TSERV_CACHE_POLICY); - if (policy.equalsIgnoreCase("LRU")) { - _iCache = new LruBlockCache(iCacheSize, blockSize); - _dCache = new LruBlockCache(dCacheSize, blockSize); - _sCache = new LruBlockCache(sCacheSize, blockSize); - } else if (policy.equalsIgnoreCase("TinyLFU")) { - _iCache = new TinyLfuBlockCache(iCacheSize, blockSize); - _dCache = new TinyLfuBlockCache(dCacheSize, blockSize); - _sCache = new TinyLfuBlockCache(sCacheSize, blockSize); - } else { - throw new IllegalArgumentException("Unknown Block cache policy " + policy); + try { + _iCache = BlockCacheFactory.getBlockCache(acuConf); + _iCache.start(acuConf, iCacheSize, blockSize); + _dCache = BlockCacheFactory.getBlockCache(acuConf); + _dCache.start(acuConf, dCacheSize, blockSize); + _sCache = BlockCacheFactory.getBlockCache(acuConf); + _sCache.start(acuConf, sCacheSize, blockSize); + } catch (Exception e) { + throw new IllegalArgumentException("Error constructing block cache", e); } Runtime runtime = Runtime.getRuntime(); @@ -543,6 +540,10 @@ public class TabletServerResourceManager { executorService.shutdown(); } + this._dCache.stop(); + this._iCache.stop(); + this._sCache.stop(); + for (Entry<String,ExecutorService> entry : threadPools.entrySet()) { while (true) { try {
