Repository: accumulo Updated Branches: refs/heads/ACCUMULO-4463 40c1cb0b6 -> b3db4be6a
ACCUMULO-4463: changes from review #2 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b3db4be6 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b3db4be6 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b3db4be6 Branch: refs/heads/ACCUMULO-4463 Commit: b3db4be6af96dab700bb2a55d8dc9fdfbbe2ec09 Parents: 40c1cb0 Author: Dave Marion <[email protected]> Authored: Tue May 9 14:20:26 2017 -0400 Committer: Dave Marion <[email protected]> Committed: Tue May 9 14:20:26 2017 -0400 ---------------------------------------------------------------------- .../core/client/rfile/RFileScanner.java | 69 +++++---- .../accumulo/core/conf/ConfigurationCopy.java | 12 ++ .../org/apache/accumulo/core/conf/Property.java | 2 +- .../core/file/blockfile/cache/BlockCache.java | 10 -- .../cache/BlockCacheConfiguration.java | 24 +++- .../cache/BlockCacheConfigurationHelper.java | 62 ++++++++ .../file/blockfile/cache/BlockCacheFactory.java | 100 ++++++++++++- .../core/file/blockfile/cache/CacheType.java | 72 ++++++++++ .../file/blockfile/cache/lru/LruBlockCache.java | 4 - .../cache/lru/LruBlockCacheConfiguration.java | 46 +++--- .../cache/lru/LruBlockCacheFactory.java | 22 ++- .../cache/tinylfu/TinyLfuBlockCache.java | 4 - .../tinylfu/TinyLfuBlockCacheConfiguration.java | 6 +- .../cache/tinylfu/TinyLfuBlockCacheFactory.java | 22 ++- .../accumulo/core/summary/SummaryReader.java | 5 - .../blockfile/cache/BlockCacheFactoryTest.java | 55 ++++++++ .../file/blockfile/cache/CacheTypeTest.java | 54 +++++++ .../file/blockfile/cache/TestLruBlockCache.java | 141 ++++++++++--------- .../accumulo/core/file/rfile/RFileTest.java | 38 +++-- .../tserver/TabletServerResourceManager.java | 39 ++--- 20 files changed, 596 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index b4a6d14..808693c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -43,9 +43,13 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfigurationHelper; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory; import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheFactory; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; @@ -57,8 +61,6 @@ import org.apache.accumulo.core.iterators.system.MultiIterator; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.LocalityGroupUtil; -import org.apache.commons.configuration.ConfigurationMap; -import org.apache.commons.configuration.MapConfiguration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.io.Text; @@ -70,6 +72,7 @@ class RFileScanner extends ScannerOptions implements Scanner { private static final Range EMPTY_RANGE = new Range(); private Range range; + private BlockCacheFactory<?,?> factory = null; private BlockCache dataCache = null; private BlockCache indexCache = null; private Opts opts; @@ -131,11 +134,6 @@ class RFileScanner extends ScannerOptions implements Scanner { }; } - @Override - public void start() {} - - @Override - public void stop() {} } RFileScanner(Opts opts) { @@ -144,28 +142,41 @@ class RFileScanner extends ScannerOptions implements Scanner { } this.opts = opts; - if (opts.indexCacheSize > 0) { - MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); - config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(opts.indexCacheSize)); - config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(CACHE_BLOCK_SIZE)); - @SuppressWarnings("unchecked") - ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); - this.indexCache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); - this.indexCache.start(); - } else { - this.indexCache = new NoopCache(); - } - if (opts.dataCacheSize > 0) { - MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); - config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(opts.dataCacheSize)); - config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(CACHE_BLOCK_SIZE)); - @SuppressWarnings("unchecked") - ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); - this.dataCache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); - this.dataCache.start(); - } else { - this.dataCache = new NoopCache(); + if (opts.indexCacheSize > 0 || opts.dataCacheSize > 0) { + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + try { + factory = BlockCacheFactory.getInstance(cc); + } catch (Exception e) { + throw new RuntimeException("Error creating BlockCacheFactory", e); + } + BlockCacheConfigurationHelper helper = null; + if (opts.indexCacheSize > 0) { + helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory); + helper.set(CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + helper.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(CACHE_BLOCK_SIZE)); + helper.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(opts.indexCacheSize)); + } + if (opts.dataCacheSize > 0) { + if (null == helper) { + helper = new BlockCacheConfigurationHelper(CacheType.DATA, factory); + } else { + helper.switchCacheType(CacheType.DATA); + } + helper.set(CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + helper.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(CACHE_BLOCK_SIZE)); + helper.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(opts.dataCacheSize)); + } + factory.start(helper.getConfiguration()); + this.indexCache = factory.getBlockCache(CacheType.INDEX); + if (null == indexCache) { + this.indexCache = new NoopCache(); + } + this.dataCache = factory.getBlockCache(CacheType.DATA); + if (null == this.dataCache) { + this.dataCache = new NoopCache(); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java index cf3eb92..af937c3 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java @@ -95,4 +95,16 @@ public class ConfigurationCopy extends AccumuloConfiguration { copy.put(key, value); } + /** + * Sets a property in this configuration if it is not already defined. + * + * @param key + * key of property to set + * @param value + * property value + */ + public void setIfAbsent(String key, String value) { + copy.putIfAbsent(key, value); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index d0f7ce2..23ca6a9 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -245,7 +245,7 @@ public enum Property { TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"), TSERV_CLIENT_TIMEOUT("tserver.client.timeout", "3s", PropertyType.TIMEDURATION, "Time to wait for clients to continue scans before closing a session."), TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", PropertyType.BYTES, "Specifies a default blocksize for the tserver caches"), - TSERV_CACHE_IMPL("tserver.cache.factory.class", "org.apache.accumulo.core.file.blockfile.cache.lru.LRUBlockCacheFactory.class", PropertyType.STRING, + TSERV_CACHE_FACTORY_IMPL("tserver.cache.factory.class", "org.apache.accumulo.core.file.blockfile.cache.lru.LRUBlockCacheFactory.class", PropertyType.STRING, "Specifies the class name of the block cache factory implementation."), TSERV_DATACACHE_SIZE("tserver.cache.data.size", "10%", PropertyType.MEMORY, "Specifies the size of the cache for file data blocks."), TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "25%", PropertyType.MEMORY, "Specifies the size of the cache for file indices."), http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java index f035f5d..dbfa294 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java @@ -23,16 +23,6 @@ package org.apache.accumulo.core.file.blockfile.cache; public interface BlockCache { /** - * Start the block cache - */ - void start(); - - /** - * Stop the block cache and release resources - */ - void stop(); - - /** * Add block to cache. * * @param blockName http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java index e3ccbf5..68d536d 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java @@ -24,8 +24,8 @@ import org.apache.accumulo.core.conf.Property; public class BlockCacheConfiguration { - public static final String MAX_SIZE_PROPERTY = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block." + "max.size"; - public static final String BLOCK_SIZE_PROPERTY = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block." + "block.size"; + public static final String MAX_SIZE_PROPERTY = "max.size"; + public static final String BLOCK_SIZE_PROPERTY = "block.size"; private static final Long DEFAULT = Long.valueOf(-1); @@ -35,10 +35,15 @@ public class BlockCacheConfiguration { /** Approximate block size */ private final long blockSize; - public BlockCacheConfiguration(AccumuloConfiguration conf) { + protected final BlockCacheConfigurationHelper helper; + + public BlockCacheConfiguration(AccumuloConfiguration conf, CacheType type, BlockCacheFactory<?,?> factory) { + + helper = new BlockCacheConfigurationHelper(conf, type, factory); + Map<String,String> props = conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX); - this.maxSize = getOrDefault(props, MAX_SIZE_PROPERTY, DEFAULT); - this.blockSize = getOrDefault(props, BLOCK_SIZE_PROPERTY, DEFAULT); + this.maxSize = getOrDefault(props, helper.getFullPropertyName(MAX_SIZE_PROPERTY), DEFAULT); + this.blockSize = getOrDefault(props, helper.getFullPropertyName(BLOCK_SIZE_PROPERTY), DEFAULT); if (DEFAULT.equals(this.maxSize)) { throw new IllegalArgumentException("Block cache max size must be specified."); @@ -49,11 +54,16 @@ public class BlockCacheConfiguration { } public long getMaxSize() { - return maxSize; + return this.maxSize; } public long getBlockSize() { - return blockSize; + return this.blockSize; + } + + @Override + public String toString() { + return "maxSize: " + getMaxSize() + ", blockSize: " + getBlockSize(); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfigurationHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfigurationHelper.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfigurationHelper.java new file mode 100644 index 0000000..bef039e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfigurationHelper.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; + +public class BlockCacheConfigurationHelper { + + private final ConfigurationCopy conf; + private final BlockCacheFactory<?,?> factory; + private String basePropertyName; + + public BlockCacheConfigurationHelper(CacheType type, BlockCacheFactory<?,?> factory) { + this(new ConfigurationCopy(), type, factory); + } + + public BlockCacheConfigurationHelper(AccumuloConfiguration conf, CacheType type, BlockCacheFactory<?,?> factory) { + this(new ConfigurationCopy(conf), type, factory); + } + + public BlockCacheConfigurationHelper(ConfigurationCopy conf, CacheType type, BlockCacheFactory<?,?> factory) { + this.conf = conf; + this.factory = factory; + this.basePropertyName = type.getPropertyPrefix(factory.getCacheImplName()); + } + + public void switchCacheType(CacheType type) { + this.basePropertyName = type.getPropertyPrefix(factory.getCacheImplName()); + } + + public String getFullPropertyName(String propertySuffix) { + return this.basePropertyName + propertySuffix; + } + + public void set(String propertySuffix, String value) { + conf.setIfAbsent(getFullPropertyName(propertySuffix), value); + } + + public String getValue(String propertySuffix) { + return conf.get(getFullPropertyName(propertySuffix)); + } + + public ConfigurationCopy getConfiguration() { + return this.conf; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java index 57ca5f1..9fdfa5e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java @@ -17,17 +17,105 @@ */ package org.apache.accumulo.core.file.blockfile.cache; +import java.util.HashMap; +import java.util.Map; + import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class BlockCacheFactory<B extends BlockCache,C extends BlockCacheConfiguration> { + + public static final String CACHE_PROPERTY_BASE = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block."; + + private static final Logger LOG = LoggerFactory.getLogger(BlockCacheFactory.class); + private static BlockCacheFactory<?,?> factory = null; + + private final Map<CacheType,B> caches = new HashMap<>(); + + /** + * Initialize the caches for each CacheType based on the configuration + * + * @param conf + * accumulo configuration + */ + public void start(AccumuloConfiguration conf) { + for (CacheType type : CacheType.values()) { + ConfigurationCopy props = type.getCacheProperties(conf, getCacheImplName()); + if (null != props) { + C cc = this.createConfiguration(props, type, this); + B cache = this.createCache(cc); + LOG.info("Created {} cache with configuration {}", type, cc); + this.caches.put(type, cache); + } + } + } + + /** + * Stop caches and release resources + */ + public abstract void stop(); + + /** + * Get the block cache of the given type + * + * @param type + * block cache type + * @return BlockCache or null if not enabled + */ + public B getBlockCache(CacheType type) { + return caches.get(type); + } + + /** + * Parse and validate the configuration + * + * @param conf + * accumulo configuration + * @param type + * cache type + * @param name + * cache implementation name + * @return validated block cache configuration + */ + protected abstract C createConfiguration(AccumuloConfiguration conf, CacheType type, BlockCacheFactory<B,C> factory); + + /** + * Create a block cache using the supplied configuration + * + * @param conf + * cache configuration + * @return configured block cache + */ + protected abstract B createCache(C conf); -public abstract class BlockCacheFactory { + /** + * Cache implementation name (e.g lru, tinylfu, etc) + * + * @return name of cache implementation in lowercase + */ + public abstract String getCacheImplName(); - public static BlockCacheFactory getBlockCacheFactory(AccumuloConfiguration conf) throws Exception { - String impl = conf.get(Property.TSERV_CACHE_IMPL); - Class<? extends BlockCacheFactory> clazz = AccumuloVFSClassLoader.loadClass(impl, BlockCacheFactory.class); - return clazz.newInstance(); + /** + * Get the BlockCacheFactory specified by the property 'tserver.cache.factory.class' + * + * @param conf + * accumulo configuration + * @return BlockCacheFactory instance + * @throws Exception + */ + public static synchronized BlockCacheFactory<?,?> getInstance(AccumuloConfiguration conf) throws Exception { + if (null == factory) { + String impl = conf.get(Property.TSERV_CACHE_FACTORY_IMPL); + @SuppressWarnings("rawtypes") + Class<? extends BlockCacheFactory> clazz = AccumuloVFSClassLoader.loadClass(impl, BlockCacheFactory.class); + factory = (BlockCacheFactory<?,?>) clazz.newInstance(); + LOG.info("Created new block cache factory of type: {}", clazz.getSimpleName()); + } + return factory; } - public abstract BlockCache getBlockCache(AccumuloConfiguration conf); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java new file mode 100644 index 0000000..7a716fd --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CacheType.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public enum CacheType { + + DATA, INDEX, SUMMARY; + + public static final String ENABLED_SUFFIX = "enabled"; + + private static final Logger LOG = LoggerFactory.getLogger(CacheType.class); + + public String getPropertyPrefix(String impl) { + return BlockCacheFactory.CACHE_PROPERTY_BASE + impl + "." + name().toLowerCase() + "."; + } + + /** + * Return configuration properties for this cache type, if enabled + * + * @param conf + * accumulo configuration object + * @param type + * type of cache + * @param typePrefix + * property prefix for this cache type + * @return configuration for this type of cache, or null + */ + public ConfigurationCopy getCacheProperties(AccumuloConfiguration conf, String implName) { + Map<String,String> props = conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX); + String prefix = getPropertyPrefix(implName); + String enabled = props.get(prefix + ENABLED_SUFFIX); + if (enabled != null && Boolean.parseBoolean(enabled)) { + LOG.info("{} cache is enabled.", this); + Map<String,String> results = new HashMap<>(); + for (Entry<String,String> prop : props.entrySet()) { + if (prop.getKey().startsWith(prefix)) { + results.put(prop.getKey(), prop.getValue()); + } + } + return new ConfigurationCopy(results); + } else { + LOG.info("{} cache is disabled.", this); + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java index 349be7f..25bc826 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java @@ -157,10 +157,6 @@ public class LruBlockCache implements BlockCache, HeapSize { this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); } - public void start() {} - - public void stop() {} - public long getOverhead() { return overhead; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java index b7fb472..32fac8a 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java @@ -22,6 +22,8 @@ import java.util.Map; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; public final class LruBlockCacheConfiguration extends BlockCacheConfiguration { @@ -41,15 +43,14 @@ public final class LruBlockCacheConfiguration extends BlockCacheConfiguration { public static final Float DEFAULT_MEMORY_FACTOR = 0.25f; // property names - private static final String PREFIX = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block.lru."; - public static final String ACCEPTABLE_FACTOR_PROPERTY = PREFIX + "acceptable.factor"; - public static final String MIN_FACTOR_PROPERTY = PREFIX + "min.factor"; - public static final String SINGLE_FACTOR_PROPERTY = PREFIX + "single.factor"; - public static final String MULTI_FACTOR_PROPERTY = PREFIX + "multi.factor"; - public static final String MEMORY_FACTOR_PROPERTY = PREFIX + "memory.factor"; - public static final String MAP_LOAD_PROPERTY = PREFIX + "map.load"; - public static final String MAP_CONCURRENCY_PROPERTY = PREFIX + "map.concurrency"; - public static final String EVICTION_THREAD_PROPERTY = PREFIX + "eviction.thread"; + public static final String ACCEPTABLE_FACTOR_PROPERTY = "acceptable.factor"; + public static final String MIN_FACTOR_PROPERTY = "min.factor"; + public static final String SINGLE_FACTOR_PROPERTY = "single.factor"; + public static final String MULTI_FACTOR_PROPERTY = "multi.factor"; + public static final String MEMORY_FACTOR_PROPERTY = "memory.factor"; + public static final String MAP_LOAD_PROPERTY = "map.load"; + public static final String MAP_CONCURRENCY_PROPERTY = "map.concurrency"; + public static final String EVICTION_THREAD_PROPERTY = "eviction.thread"; /** Acceptable size of cache (no evictions if size < acceptable) */ private final float acceptableFactor; @@ -74,17 +75,17 @@ public final class LruBlockCacheConfiguration extends BlockCacheConfiguration { private final boolean useEvictionThread; - public LruBlockCacheConfiguration(AccumuloConfiguration conf) { - super(conf); + public LruBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type, BlockCacheFactory<LruBlockCache,LruBlockCacheConfiguration> factory) { + super(conf, type, factory); Map<String,String> props = conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX); - this.acceptableFactor = getOrDefault(props, ACCEPTABLE_FACTOR_PROPERTY, DEFAULT_ACCEPTABLE_FACTOR); - this.minFactor = getOrDefault(props, MIN_FACTOR_PROPERTY, DEFAULT_MIN_FACTOR); - this.singleFactor = getOrDefault(props, SINGLE_FACTOR_PROPERTY, DEFAULT_SINGLE_FACTOR); - this.multiFactor = getOrDefault(props, MULTI_FACTOR_PROPERTY, DEFAULT_MULTI_FACTOR); - this.memoryFactor = getOrDefault(props, MEMORY_FACTOR_PROPERTY, DEFAULT_MEMORY_FACTOR); - this.mapLoadFactor = getOrDefault(props, MAP_LOAD_PROPERTY, DEFAULT_LOAD_FACTOR); - this.mapConcurrencyLevel = getOrDefault(props, MAP_CONCURRENCY_PROPERTY, DEFAULT_CONCURRENCY_LEVEL); - this.useEvictionThread = getOrDefault(props, EVICTION_THREAD_PROPERTY, Boolean.TRUE); + this.acceptableFactor = getOrDefault(props, helper.getFullPropertyName(ACCEPTABLE_FACTOR_PROPERTY), DEFAULT_ACCEPTABLE_FACTOR); + this.minFactor = getOrDefault(props, helper.getFullPropertyName(MIN_FACTOR_PROPERTY), DEFAULT_MIN_FACTOR); + this.singleFactor = getOrDefault(props, helper.getFullPropertyName(SINGLE_FACTOR_PROPERTY), DEFAULT_SINGLE_FACTOR); + this.multiFactor = getOrDefault(props, helper.getFullPropertyName(MULTI_FACTOR_PROPERTY), DEFAULT_MULTI_FACTOR); + this.memoryFactor = getOrDefault(props, helper.getFullPropertyName(MEMORY_FACTOR_PROPERTY), DEFAULT_MEMORY_FACTOR); + this.mapLoadFactor = getOrDefault(props, helper.getFullPropertyName(MAP_LOAD_PROPERTY), DEFAULT_LOAD_FACTOR); + this.mapConcurrencyLevel = getOrDefault(props, helper.getFullPropertyName(MAP_CONCURRENCY_PROPERTY), DEFAULT_CONCURRENCY_LEVEL); + this.useEvictionThread = getOrDefault(props, helper.getFullPropertyName(EVICTION_THREAD_PROPERTY), Boolean.TRUE); } public float getAcceptableFactor() { @@ -119,4 +120,11 @@ public final class LruBlockCacheConfiguration extends BlockCacheConfiguration { return useEvictionThread; } + @Override + public String toString() { + return super.toString() + ", acceptableFactor: " + this.getAcceptableFactor() + ", minFactor: " + this.getMinFactor() + ", singleFactor: " + + this.getSingleFactor() + ", multiFactor: " + this.getMultiFactor() + ", memoryFactor: " + this.getMemoryFactor() + ", mapLoadFactor: " + + this.getMapLoadFactor() + ", mapConcurrencyLevel: " + this.getMapConcurrencyLevel() + ", useEvictionThread: " + this.isUseEvictionThread(); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java index b923b8e..b3318a0 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java @@ -18,14 +18,28 @@ package org.apache.accumulo.core.file.blockfile.cache.lru; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; -public class LruBlockCacheFactory extends BlockCacheFactory { +public class LruBlockCacheFactory extends BlockCacheFactory<LruBlockCache,LruBlockCacheConfiguration> { @Override - public BlockCache getBlockCache(AccumuloConfiguration conf) { - return new LruBlockCache(new LruBlockCacheConfiguration(conf)); + protected LruBlockCacheConfiguration createConfiguration(AccumuloConfiguration conf, CacheType type, + BlockCacheFactory<LruBlockCache,LruBlockCacheConfiguration> factory) { + return new LruBlockCacheConfiguration(conf, type, factory); } + @Override + protected LruBlockCache createCache(LruBlockCacheConfiguration conf) { + return new LruBlockCache(conf); + } + + @Override + public String getCacheImplName() { + return "lru"; + } + + @Override + public void stop() {} + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java index fccd55c..e76ccd9 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCache.java @@ -65,10 +65,6 @@ public final class TinyLfuBlockCache implements BlockCache { } - public void start() {} - - public void stop() {} - @Override public long getMaxSize() { return policy.getMaximum(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java index 3d1efa5..7b06872 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java @@ -19,11 +19,13 @@ package org.apache.accumulo.core.file.blockfile.cache.tinylfu; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; public final class TinyLfuBlockCacheConfiguration extends BlockCacheConfiguration { - public TinyLfuBlockCacheConfiguration(AccumuloConfiguration conf) { - super(conf); + public TinyLfuBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type, BlockCacheFactory<TinyLfuBlockCache,TinyLfuBlockCacheConfiguration> factory) { + super(conf, type, factory); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java index 33db576..f45d7a7 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java @@ -18,14 +18,28 @@ package org.apache.accumulo.core.file.blockfile.cache.tinylfu; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; -public class TinyLfuBlockCacheFactory extends BlockCacheFactory { +public class TinyLfuBlockCacheFactory extends BlockCacheFactory<TinyLfuBlockCache,TinyLfuBlockCacheConfiguration> { @Override - public BlockCache getBlockCache(AccumuloConfiguration conf) { - return new TinyLfuBlockCache(new TinyLfuBlockCacheConfiguration(conf)); + protected TinyLfuBlockCacheConfiguration createConfiguration(AccumuloConfiguration conf, CacheType type, + BlockCacheFactory<TinyLfuBlockCache,TinyLfuBlockCacheConfiguration> factory) { + return new TinyLfuBlockCacheConfiguration(conf, type, factory); } + @Override + protected TinyLfuBlockCache createCache(TinyLfuBlockCacheConfiguration conf) { + return new TinyLfuBlockCache(conf); + } + + @Override + public String getCacheImplName() { + return "tinylfu"; + } + + @Override + public void stop() {} + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index ab98816..88b1d85 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -90,11 +90,6 @@ public class SummaryReader { return summaryCache.getStats(); } - @Override - public void start() {} - - @Override - public void stop() {} } private static List<SummarySerializer> load(BlockReader bcReader, Predicate<SummarizerConfiguration> summarySelector) throws IOException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java new file mode 100644 index 0000000..beef072 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheFactory; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheFactory; +import org.junit.Assert; +import org.junit.Test; + +public class BlockCacheFactoryTest { + + @Test + public void testCreateLruBlockCacheFactory() throws Exception { + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + BlockCacheFactory.getInstance(cc); + } + + @Test + public void testCreateTinyLfuBlockCacheFactory() throws Exception { + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, TinyLfuBlockCacheFactory.class.getName()); + BlockCacheFactory.getInstance(cc); + } + + @Test + public void testStart() throws Exception { + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + BlockCacheFactory<?,?> factory = BlockCacheFactory.getInstance(cc); + String indexPrefix = CacheType.INDEX.getPropertyPrefix(factory.getCacheImplName()); + cc.setIfAbsent(indexPrefix + CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + cc.setIfAbsent(indexPrefix + BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(2048)); + cc.setIfAbsent(indexPrefix + BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(1048576)); + factory.start(cc); + Assert.assertNotNull(factory.getBlockCache(CacheType.INDEX)); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/CacheTypeTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/CacheTypeTest.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/CacheTypeTest.java new file mode 100644 index 0000000..9b0c2da --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/CacheTypeTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache; + +import org.apache.accumulo.core.conf.ConfigurationCopy; +import org.junit.Assert; +import org.junit.Test; + +public class CacheTypeTest { + + @Test + public void testGetPropertyPrefix() throws Exception { + Assert.assertEquals("general.custom.cache.block.lru.data.", CacheType.DATA.getPropertyPrefix("lru")); + } + + @Test + public void testCacheEnabled() { + ConfigurationCopy cc = new ConfigurationCopy(); + String indexPrefix = CacheType.INDEX.getPropertyPrefix("lru"); + cc.setIfAbsent(indexPrefix + CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + cc.setIfAbsent(indexPrefix + BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(100000)); + cc.setIfAbsent(indexPrefix + BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(100000000)); + ConfigurationCopy cc2 = CacheType.INDEX.getCacheProperties(cc, "lru"); + Assert.assertNotNull(cc2); + Assert.assertEquals(Boolean.TRUE.toString(), cc2.get(indexPrefix + CacheType.ENABLED_SUFFIX)); + Assert.assertEquals(Long.toString(100000), cc2.get(indexPrefix + BlockCacheConfiguration.BLOCK_SIZE_PROPERTY)); + Assert.assertEquals(Long.toString(100000000), cc2.get(indexPrefix + BlockCacheConfiguration.MAX_SIZE_PROPERTY)); + } + + @Test + public void testCacheDisabled() { + ConfigurationCopy cc = new ConfigurationCopy(); + String indexPrefix = CacheType.INDEX.getPropertyPrefix("lru"); + cc.setIfAbsent(indexPrefix + CacheType.ENABLED_SUFFIX, Boolean.FALSE.toString()); + ConfigurationCopy cc2 = CacheType.INDEX.getCacheProperties(cc, "lru"); + Assert.assertNull(cc2); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java index d84fa19..2dc244b 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java @@ -18,16 +18,15 @@ package org.apache.accumulo.core.file.blockfile.cache; import java.util.Arrays; -import java.util.HashMap; import java.util.Random; import junit.framework.TestCase; import org.apache.accumulo.core.conf.ConfigurationCopy; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; -import org.apache.commons.configuration.ConfigurationMap; -import org.apache.commons.configuration.MapConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheFactory; /** * Tests the concurrent LruBlockCache. @@ -43,13 +42,15 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSizeDefault(maxSize, 9); // room for 9, will evict - MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); - config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); - config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); - @SuppressWarnings("unchecked") - ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); - LruBlockCache cache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); - cache.start(); + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + BlockCacheFactory<?,?> factory = BlockCacheFactory.getInstance(cc); + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory); + helper.set(CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + helper.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); + helper.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); + factory.start(helper.getConfiguration()); + LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); Block[] blocks = generateFixedBlocks(10, blockSize, "block"); @@ -73,13 +74,15 @@ public class TestLruBlockCache extends TestCase { long maxSize = 1000000; long blockSize = calculateBlockSizeDefault(maxSize, 101); - MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); - config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); - config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); - @SuppressWarnings("unchecked") - ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); - LruBlockCache cache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); - cache.start(); + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + BlockCacheFactory<?,?> factory = BlockCacheFactory.getInstance(cc); + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory); + helper.set(CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + helper.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); + helper.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); + factory.start(helper.getConfiguration()); + LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); Block[] blocks = generateRandomBlocks(100, blockSize); @@ -128,14 +131,16 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSizeDefault(maxSize, 10); - MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); - config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); - config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); - config.setProperty(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.toString(false)); - @SuppressWarnings("unchecked") - ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); - LruBlockCache cache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); - cache.start(); + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + BlockCacheFactory<?,?> factory = BlockCacheFactory.getInstance(cc); + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory); + helper.set(CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + helper.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); + helper.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); + helper.set(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); + factory.start(helper.getConfiguration()); + LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); Block[] blocks = generateFixedBlocks(10, blockSize, "block"); @@ -172,19 +177,21 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSizeDefault(maxSize, 10); - MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); - config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); - config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); - config.setProperty(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.toString(false)); - config.setProperty(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.98f)); - config.setProperty(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); - config.setProperty(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.25f)); - config.setProperty(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.50f)); - config.setProperty(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.25f)); - @SuppressWarnings("unchecked") - ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); - LruBlockCache cache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); - cache.start(); + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + BlockCacheFactory<?,?> factory = BlockCacheFactory.getInstance(cc); + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory); + helper.set(CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + helper.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); + helper.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); + helper.set(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); + helper.set(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.98f)); + helper.set(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); + helper.set(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.25f)); + helper.set(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.50f)); + helper.set(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.25f)); + factory.start(helper.getConfiguration()); + LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); Block[] singleBlocks = generateFixedBlocks(5, 10000, "single"); Block[] multiBlocks = generateFixedBlocks(5, 10000, "multi"); @@ -238,19 +245,21 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSize(maxSize, 10); - MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); - config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); - config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); - config.setProperty(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.toString(false)); - config.setProperty(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.98f)); - config.setProperty(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); - config.setProperty(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.33f)); - config.setProperty(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.33f)); - config.setProperty(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.34f)); - @SuppressWarnings("unchecked") - ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); - LruBlockCache cache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); - cache.start(); + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + BlockCacheFactory<?,?> factory = BlockCacheFactory.getInstance(cc); + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory); + helper.set(CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + helper.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); + helper.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); + helper.set(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); + helper.set(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.98f)); + helper.set(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); + helper.set(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.33f)); + helper.set(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.33f)); + helper.set(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.34f)); + factory.start(helper.getConfiguration()); + LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); Block[] singleBlocks = generateFixedBlocks(5, blockSize, "single"); Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); @@ -363,19 +372,21 @@ public class TestLruBlockCache extends TestCase { long maxSize = 100000; long blockSize = calculateBlockSize(maxSize, 10); - MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); - config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); - config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); - config.setProperty(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.toString(false)); - config.setProperty(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.66f)); - config.setProperty(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); - config.setProperty(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.33f)); - config.setProperty(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.33f)); - config.setProperty(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.34f)); - @SuppressWarnings("unchecked") - ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); - LruBlockCache cache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); - cache.start(); + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + BlockCacheFactory<?,?> factory = BlockCacheFactory.getInstance(cc); + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory); + helper.set(CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + helper.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); + helper.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(maxSize)); + helper.set(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); + helper.set(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.66f)); + helper.set(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); + helper.set(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.33f)); + helper.set(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.33f)); + helper.set(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.34f)); + factory.start(helper.getConfiguration()); + LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); Block[] singleBlocks = generateFixedBlocks(20, blockSize, "single"); Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index c66e393..5b04b41 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -34,7 +34,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -58,8 +57,12 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.file.FileSKVIterator; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfigurationHelper; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheFactory; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; import org.apache.accumulo.core.file.streams.PositionedOutputs; @@ -72,8 +75,6 @@ import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.sample.impl.SamplerFactory; import org.apache.accumulo.core.security.crypto.CryptoTest; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.commons.configuration.ConfigurationMap; -import org.apache.commons.configuration.MapConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -268,16 +269,25 @@ public class RFileTest { in = new FSDataInputStream(bais); fileLength = data.length; - MapConfiguration config = new MapConfiguration(new HashMap<String,String>()); - config.setProperty(LruBlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(100000000)); - config.setProperty(LruBlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(100000)); - @SuppressWarnings("unchecked") - ConfigurationCopy copy = new ConfigurationCopy(new ConfigurationMap(config)); - LruBlockCache indexCache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); - indexCache.start(); - - LruBlockCache dataCache = new LruBlockCache(new LruBlockCacheConfiguration(copy)); - dataCache.start(); + ConfigurationCopy cc = new ConfigurationCopy(); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + BlockCacheFactory<?,?> factory; + try { + factory = BlockCacheFactory.getInstance(cc); + } catch (Exception e) { + throw new RuntimeException("Error creating BlockCacheFactory", e); + } + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory); + helper.set(CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + helper.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(100000)); + helper.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(100000000)); + helper.switchCacheType(CacheType.DATA); + helper.set(CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + helper.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(100000)); + helper.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(100000000)); + factory.start(helper.getConfiguration()); + LruBlockCache indexCache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); + LruBlockCache dataCache = (LruBlockCache) factory.getBlockCache(CacheType.DATA); CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf, dataCache, indexCache, AccumuloConfiguration.getDefaultConfiguration()); reader = new RFile.Reader(_cbr); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3db4be6/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index bf08f4b..276e29f 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -43,6 +43,7 @@ import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.NamingThreadFactory; @@ -100,6 +101,7 @@ public class TabletServerResourceManager { private final MemoryManagementFramework memMgmt; + private final BlockCacheFactory<?,?> factory; private final BlockCache _dCache; private final BlockCache _iCache; private final BlockCache _sCache; @@ -176,26 +178,31 @@ public class TabletServerResourceManager { long sCacheSize = acuConf.getAsBytes(Property.TSERV_SUMMARYCACHE_SIZE); long totalQueueSize = acuConf.getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX); - BlockCacheFactory factory; try { - factory = BlockCacheFactory.getBlockCacheFactory(acuConf); + factory = BlockCacheFactory.getInstance(acuConf); } catch (Exception e) { - throw new RuntimeException("Error creating Block Cache Factory", e); + throw new RuntimeException("Error creating BlockCacheFactory", e); } - ConfigurationCopy copy = new ConfigurationCopy(acuConf); - copy.set(BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); - copy.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(iCacheSize)); - _iCache = factory.getBlockCache(copy); - _iCache.start(); + ConfigurationCopy cc = new ConfigurationCopy(acuConf); + String indexPrefix = CacheType.INDEX.getPropertyPrefix(factory.getCacheImplName()); + cc.setIfAbsent(indexPrefix + CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + cc.setIfAbsent(indexPrefix + BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); + cc.setIfAbsent(indexPrefix + BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(iCacheSize)); + String dataPrefix = CacheType.DATA.getPropertyPrefix(factory.getCacheImplName()); + cc.setIfAbsent(dataPrefix + CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + cc.setIfAbsent(dataPrefix + BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); + cc.setIfAbsent(dataPrefix + BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(dCacheSize)); + String summaryPrefix = CacheType.DATA.getPropertyPrefix(factory.getCacheImplName()); + cc.setIfAbsent(summaryPrefix + CacheType.ENABLED_SUFFIX, Boolean.TRUE.toString()); + cc.setIfAbsent(summaryPrefix + BlockCacheConfiguration.BLOCK_SIZE_PROPERTY, Long.toString(blockSize)); + cc.setIfAbsent(summaryPrefix + BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(sCacheSize)); - copy.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(dCacheSize)); - _dCache = factory.getBlockCache(copy); - _dCache.start(); + factory.start(cc); - copy.set(BlockCacheConfiguration.MAX_SIZE_PROPERTY, Long.toString(sCacheSize)); - _sCache = factory.getBlockCache(copy); - _sCache.start(); + _iCache = factory.getBlockCache(CacheType.INDEX); + _dCache = factory.getBlockCache(CacheType.DATA); + _sCache = factory.getBlockCache(CacheType.SUMMARY); Runtime runtime = Runtime.getRuntime(); if (usingNativeMap) { @@ -552,9 +559,7 @@ public class TabletServerResourceManager { executorService.shutdown(); } - this._dCache.stop(); - this._iCache.stop(); - this._sCache.stop(); + this.factory.stop(); for (Entry<String,ExecutorService> entry : threadPools.entrySet()) { while (true) {
