Repository: accumulo Updated Branches: refs/heads/ACCUMULO-4463 d98fba14b -> 5af81142b
ACCUMULO-4463: more changes from review Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5af81142 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5af81142 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5af81142 Branch: refs/heads/ACCUMULO-4463 Commit: 5af81142be8cc412dcb2c1b0a12bf0b7ea895be0 Parents: d98fba1 Author: Dave Marion <[email protected]> Authored: Fri May 12 12:45:19 2017 -0400 Committer: Dave Marion <[email protected]> Committed: Fri May 12 12:45:19 2017 -0400 ---------------------------------------------------------------------- .../core/client/rfile/RFileScanner.java | 52 +++++---- .../accumulo/core/conf/ConfigurationCopy.java | 12 --- .../cache/BlockCacheConfiguration.java | 70 +++++++----- .../cache/BlockCacheConfigurationHelper.java | 52 --------- .../file/blockfile/cache/BlockCacheManager.java | 54 +++++++--- .../core/file/blockfile/cache/CachedBlock.java | 2 - .../file/blockfile/cache/lru/LruBlockCache.java | 21 +--- .../cache/lru/LruBlockCacheConfiguration.java | 39 ++++--- .../cache/lru/LruBlockCacheFactory.java | 43 -------- .../cache/lru/LruBlockCacheManager.java | 48 +++++++++ .../tinylfu/TinyLfuBlockCacheConfiguration.java | 6 +- .../cache/tinylfu/TinyLfuBlockCacheFactory.java | 45 -------- .../cache/tinylfu/TinyLfuBlockCacheManager.java | 37 +++++++ .../blockfile/cache/BlockCacheFactoryTest.java | 14 +-- .../cache/BlockConfigurationHelperTest.java | 14 ++- .../file/blockfile/cache/TestLruBlockCache.java | 106 ++++++++++--------- .../accumulo/core/file/rfile/RFileTest.java | 19 ++-- .../tserver/TabletServerResourceManager.java | 14 +-- 18 files changed, 318 insertions(+), 330 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 4410d47..42eba78 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -47,7 +47,6 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCache; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; import org.apache.accumulo.core.file.blockfile.cache.CacheEntry; import org.apache.accumulo.core.file.blockfile.cache.CacheType; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; @@ -70,7 +69,7 @@ class RFileScanner extends ScannerOptions implements Scanner { private static final Range EMPTY_RANGE = new Range(); private Range range; - private BlockCacheManager factory = null; + private BlockCacheManager blockCacheManager = null; private BlockCache dataCache = null; private BlockCache indexCache = null; private Opts opts; @@ -150,26 +149,26 @@ class RFileScanner extends ScannerOptions implements Scanner { } cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(CACHE_BLOCK_SIZE)); try { - factory = BlockCacheManager.getInstance(cc); + blockCacheManager = BlockCacheManager.getClientInstance(cc); + if (opts.indexCacheSize > 0) { + cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(opts.indexCacheSize)); + } + if (opts.dataCacheSize > 0) { + cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(opts.dataCacheSize)); + } + blockCacheManager.start(cc); + this.indexCache = blockCacheManager.getBlockCache(CacheType.INDEX); + this.dataCache = blockCacheManager.getBlockCache(CacheType.DATA); } catch (Exception e) { - throw new RuntimeException("Error creating BlockCacheFactory", e); - } - if (opts.indexCacheSize > 0) { - cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(opts.indexCacheSize)); - } - if (opts.dataCacheSize > 0) { - cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(opts.dataCacheSize)); - } - factory.start(cc); - this.indexCache = factory.getBlockCache(CacheType.INDEX); - if (null == indexCache) { - this.indexCache = new NoopCache(); - } - this.dataCache = factory.getBlockCache(CacheType.DATA); - if (null == this.dataCache) { - this.dataCache = new NoopCache(); + // FIXME: How do we report an error back to the user? } } + if (null == indexCache) { + this.indexCache = new NoopCache(); + } + if (null == this.dataCache) { + this.dataCache = new NoopCache(); + } } @Override @@ -350,14 +349,6 @@ class RFileScanner extends ScannerOptions implements Scanner { @Override public void close() { - if (dataCache instanceof LruBlockCache) { - ((LruBlockCache) dataCache).shutdown(); - } - - if (indexCache instanceof LruBlockCache) { - ((LruBlockCache) indexCache).shutdown(); - } - try { for (RFileSource source : opts.in.getSources()) { source.getInputStream().close(); @@ -365,5 +356,12 @@ class RFileScanner extends ScannerOptions implements Scanner { } catch (IOException e) { throw new RuntimeException(e); } + try { + if (null != this.blockCacheManager) { + this.blockCacheManager.stop(); + } + } catch (Exception e1) { + throw new RuntimeException(e1); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java index af937c3..cf3eb92 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationCopy.java @@ -95,16 +95,4 @@ public class ConfigurationCopy extends AccumuloConfiguration { copy.put(key, value); } - /** - * Sets a property in this configuration if it is not already defined. - * - * @param key - * key of property to set - * @param value - * property value - */ - public void setIfAbsent(String key, String value) { - copy.putIfAbsent(key, value); - } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java index a7e0828..d879f17 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfiguration.java @@ -18,20 +18,60 @@ package org.apache.accumulo.core.file.blockfile.cache; import java.util.Map; +import java.util.Optional; import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; public class BlockCacheConfiguration { + public static class BlockCacheConfigurationHelper { + + private final ConfigurationCopy conf; + private final String basePropertyName; + + protected BlockCacheConfigurationHelper(ConfigurationCopy conf, CacheType type, String implName) { + this.conf = conf; + this.basePropertyName = BlockCacheManager.CACHE_PROPERTY_BASE + implName + "." + type.name().toLowerCase() + "."; + } + + public String getPropertyPrefix() { + return basePropertyName; + } + + public String getFullPropertyName(String propertySuffix) { + return this.basePropertyName + propertySuffix; + } + + public Optional<String> get(String property) { + return Optional.ofNullable(this.conf.get(this.getFullPropertyName(property))); + } + + public void set(String propertySuffix, String value) { + conf.set(getFullPropertyName(propertySuffix), value); + } + + public ConfigurationCopy getConfiguration() { + return this.conf; + } + } + /** Maximum allowable size of cache (block put if size > max, evict) */ private final long maxSize; /** Approximate block size */ private final long blockSize; + /** Helper object for working with block cache configuration **/ + private final BlockCacheConfigurationHelper helper; + public BlockCacheConfiguration(AccumuloConfiguration conf, CacheType type, String implName) { + Map<String,String> props = conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX); + ConfigurationCopy blockCacheConfiguration = new ConfigurationCopy(props); + this.helper = new BlockCacheConfigurationHelper(blockCacheConfiguration, type, implName); + switch (type) { case INDEX: this.maxSize = conf.getAsBytes(Property.TSERV_INDEXCACHE_SIZE); @@ -43,12 +83,15 @@ public class BlockCacheConfiguration { this.maxSize = conf.getAsBytes(Property.TSERV_SUMMARYCACHE_SIZE); break; default: - this.maxSize = conf.getAsBytes(Property.TSERV_DEFAULT_BLOCKSIZE); - break; + throw new IllegalArgumentException("Unknown block cache type"); } this.blockSize = conf.getAsBytes(Property.TSERV_DEFAULT_BLOCKSIZE); } + public BlockCacheConfigurationHelper getHelper() { + return this.helper; + } + public long getMaxSize() { return this.maxSize; } @@ -62,27 +105,4 @@ public class BlockCacheConfiguration { return "maxSize: " + getMaxSize() + ", blockSize: " + getBlockSize(); } - @SuppressWarnings("unchecked") - protected <T> T getOrDefault(Map<String,String> props, String propertyName, T defaultValue) { - String o = props.get(propertyName); - if (null == o && defaultValue == null) { - throw new RuntimeException("Property " + propertyName + " not specified and no default supplied."); - } else if (null == o) { - return defaultValue; - } else { - if (defaultValue.getClass().equals(Integer.class)) { - return (T) Integer.valueOf(o); - } else if (defaultValue.getClass().equals(Long.class)) { - return (T) Long.valueOf(o); - } else if (defaultValue.getClass().equals(Float.class)) { - return (T) Float.valueOf(o); - } else if (defaultValue.getClass().equals(Boolean.class)) { - return (T) Boolean.valueOf(o); - } else { - throw new RuntimeException("Unknown parameter type"); - } - } - - } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfigurationHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfigurationHelper.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfigurationHelper.java deleted file mode 100644 index 84b90e6..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheConfigurationHelper.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file.blockfile.cache; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.ConfigurationCopy; - -public class BlockCacheConfigurationHelper { - - private final ConfigurationCopy conf; - private String basePropertyName; - - public BlockCacheConfigurationHelper(AccumuloConfiguration conf, CacheType type, String implName) { - this(new ConfigurationCopy(conf), type, implName); - } - - public BlockCacheConfigurationHelper(ConfigurationCopy conf, CacheType type, String implName) { - this.conf = conf; - this.basePropertyName = BlockCacheManager.CACHE_PROPERTY_BASE + implName + "." + type.name().toLowerCase() + "."; - } - - public String getPropertyPrefix() { - return basePropertyName; - } - - public String getFullPropertyName(String propertySuffix) { - return this.basePropertyName + propertySuffix; - } - - public void setIfAbsent(String propertySuffix, String value) { - conf.setIfAbsent(getFullPropertyName(propertySuffix), value); - } - - public ConfigurationCopy getConfiguration() { - return this.conf; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java index a3a9b32..b1da718 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheManager.java @@ -31,13 +31,13 @@ public abstract class BlockCacheManager { public static final String CACHE_PROPERTY_BASE = Property.GENERAL_ARBITRARY_PROP_PREFIX + "cache.block."; private static final Logger LOG = LoggerFactory.getLogger(BlockCacheManager.class); - private static BlockCacheManager factory = null; + private static BlockCacheManager manager = null; - private final Map<CacheType,BlockCache> caches = new HashMap<>(); + protected final Map<CacheType,BlockCache> caches = new HashMap<>(); /** * Initialize the caches for each CacheType based on the configuration - * + * * @param conf * accumulo configuration */ @@ -51,11 +51,18 @@ public abstract class BlockCacheManager { /** * Stop caches and release resources */ - public void stop() {} + public void stop() { + this.caches.clear(); + close(); + } + + private static synchronized void close() { + manager = null; + } /** * Get the block cache of the given type - * + * * @param type * block cache type * @return BlockCache or null if not enabled @@ -66,7 +73,7 @@ public abstract class BlockCacheManager { /** * Create a block cache using the supplied configuration - * + * * @param conf * cache configuration * @return configured block cache @@ -74,28 +81,41 @@ public abstract class BlockCacheManager { protected abstract BlockCache createCache(AccumuloConfiguration conf, CacheType type); /** - * Cache implementation name (e.g lru, tinylfu, etc) - * - * @return name of cache implementation in lowercase + * Get the BlockCacheFactory specified by the property 'tserver.cache.factory.class' using the AccumuloVFSClassLoader + * + * @param conf + * accumulo configuration + * @return block cache manager instance + * @throws Exception + * error loading block cache manager implementation class */ - public abstract String getCacheImplName(); + public static synchronized BlockCacheManager getInstance(AccumuloConfiguration conf) throws Exception { + if (null == manager) { + String impl = conf.get(Property.TSERV_CACHE_FACTORY_IMPL); + Class<? extends BlockCacheManager> clazz = AccumuloVFSClassLoader.loadClass(impl, BlockCacheManager.class); + manager = (BlockCacheManager) clazz.newInstance(); + LOG.info("Created new block cache manager of type: {}", clazz.getSimpleName()); + } + return manager; + } /** * Get the BlockCacheFactory specified by the property 'tserver.cache.factory.class' - * + * * @param conf * accumulo configuration - * @return BlockCacheFactory instance + * @return block cache manager instance * @throws Exception + * error loading block cache manager implementation class */ - public static synchronized BlockCacheManager getInstance(AccumuloConfiguration conf) throws Exception { - if (null == factory) { + public static synchronized BlockCacheManager getClientInstance(AccumuloConfiguration conf) throws Exception { + if (null == manager) { String impl = conf.get(Property.TSERV_CACHE_FACTORY_IMPL); - Class<? extends BlockCacheManager> clazz = AccumuloVFSClassLoader.loadClass(impl, BlockCacheManager.class); - factory = (BlockCacheManager) clazz.newInstance(); + Class<? extends BlockCacheManager> clazz = Class.forName(impl).asSubclass(BlockCacheManager.class); + manager = (BlockCacheManager) clazz.newInstance(); LOG.info("Created new block cache factory of type: {}", clazz.getSimpleName()); } - return factory; + return manager; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java index b04b77a..44cea6b 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/CachedBlock.java @@ -19,8 +19,6 @@ package org.apache.accumulo.core.file.blockfile.cache; import java.util.Objects; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; - /** * Represents an entry in the {@link LruBlockCache}. * http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java index 25bc826..383b4d4 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCache.java @@ -91,16 +91,16 @@ public class LruBlockCache implements BlockCache, HeapSize { private final ScheduledExecutorService scheduleThreadPool = Executors.newScheduledThreadPool(1, new NamingThreadFactory("LRUBlockCacheStats")); /** Current size of cache */ - private AtomicLong size; + private final AtomicLong size; /** Current number of cached elements */ - private AtomicLong elements; + private final AtomicLong elements; /** Cache access count (sequential ID) */ - private AtomicLong count; + private final AtomicLong count; /** Cache statistics */ - private CacheStats stats; + private final CacheStats stats; /** Overhead of the structure itself */ private final long overhead; @@ -115,25 +115,12 @@ public class LruBlockCache implements BlockCache, HeapSize { * * @param conf * block cache configuration - * @param maxSize - * maximum size of cache, in bytes - * @param blockSize - * approximate size of each block, in bytes */ public LruBlockCache(final LruBlockCacheConfiguration conf) { this.conf = conf; int mapInitialSize = (int) Math.ceil(1.2 * conf.getMaxSize() / conf.getBlockSize()); - if (conf.getSingleFactor() + conf.getMultiFactor() + conf.getMemoryFactor() != 1) { - throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0"); - } - if (conf.getMinFactor() >= conf.getAcceptableFactor()) { - throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); - } - if (conf.getMinFactor() >= 1.0f || conf.getAcceptableFactor() >= 1.0f) { - throw new IllegalArgumentException("all factors must be < 1"); - } map = new ConcurrentHashMap<>(mapInitialSize, conf.getMapLoadFactor(), conf.getMapConcurrencyLevel()); this.stats = new CacheStats(); this.count = new AtomicLong(0); http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java index 377a849..7131d89 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheConfiguration.java @@ -17,16 +17,14 @@ */ package org.apache.accumulo.core.file.blockfile.cache.lru; -import java.util.Map; - import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration; -import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfigurationHelper; import org.apache.accumulo.core.file.blockfile.cache.CacheType; public final class LruBlockCacheConfiguration extends BlockCacheConfiguration { + public static final String PROPERTY_PREFIX = "lru"; + /** Default Configuration Parameters */ /** Backing Concurrent Map Configuration */ @@ -75,18 +73,27 @@ public final class LruBlockCacheConfiguration extends BlockCacheConfiguration { private final boolean useEvictionThread; - public LruBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type, String implName) { - super(conf, type, implName); - Map<String,String> props = conf.getAllPropertiesWithPrefix(Property.GENERAL_ARBITRARY_PROP_PREFIX); - BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(conf, type, implName); - this.acceptableFactor = getOrDefault(props, helper.getFullPropertyName(ACCEPTABLE_FACTOR_PROPERTY), DEFAULT_ACCEPTABLE_FACTOR); - this.minFactor = getOrDefault(props, helper.getFullPropertyName(MIN_FACTOR_PROPERTY), DEFAULT_MIN_FACTOR); - this.singleFactor = getOrDefault(props, helper.getFullPropertyName(SINGLE_FACTOR_PROPERTY), DEFAULT_SINGLE_FACTOR); - this.multiFactor = getOrDefault(props, helper.getFullPropertyName(MULTI_FACTOR_PROPERTY), DEFAULT_MULTI_FACTOR); - this.memoryFactor = getOrDefault(props, helper.getFullPropertyName(MEMORY_FACTOR_PROPERTY), DEFAULT_MEMORY_FACTOR); - this.mapLoadFactor = getOrDefault(props, helper.getFullPropertyName(MAP_LOAD_PROPERTY), DEFAULT_LOAD_FACTOR); - this.mapConcurrencyLevel = getOrDefault(props, helper.getFullPropertyName(MAP_CONCURRENCY_PROPERTY), DEFAULT_CONCURRENCY_LEVEL); - this.useEvictionThread = getOrDefault(props, helper.getFullPropertyName(EVICTION_THREAD_PROPERTY), Boolean.TRUE); + public LruBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type) { + super(conf, type, PROPERTY_PREFIX); + + this.acceptableFactor = this.getHelper().get(ACCEPTABLE_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_ACCEPTABLE_FACTOR); + this.minFactor = this.getHelper().get(MIN_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_MIN_FACTOR); + this.singleFactor = this.getHelper().get(SINGLE_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_SINGLE_FACTOR); + this.multiFactor = this.getHelper().get(MULTI_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_MULTI_FACTOR); + this.memoryFactor = this.getHelper().get(MEMORY_FACTOR_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_MEMORY_FACTOR); + this.mapLoadFactor = this.getHelper().get(MAP_LOAD_PROPERTY).map(Float::valueOf).filter(f -> f > 0).orElse(DEFAULT_LOAD_FACTOR); + this.mapConcurrencyLevel = this.getHelper().get(MAP_CONCURRENCY_PROPERTY).map(Integer::valueOf).filter(i -> i > 0).orElse(DEFAULT_CONCURRENCY_LEVEL); + this.useEvictionThread = this.getHelper().get(EVICTION_THREAD_PROPERTY).map(Boolean::valueOf).orElse(true); + + if (this.getSingleFactor() + this.getMultiFactor() + this.getMemoryFactor() != 1) { + throw new IllegalArgumentException("Single, multi, and memory factors " + " should total 1.0"); + } + if (this.getMinFactor() >= this.getAcceptableFactor()) { + throw new IllegalArgumentException("minFactor must be smaller than acceptableFactor"); + } + if (this.getMinFactor() >= 1.0f || this.getAcceptableFactor() >= 1.0f) { + throw new IllegalArgumentException("all factors must be < 1"); + } } public float getAcceptableFactor() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java deleted file mode 100644 index 2ce115b..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file.blockfile.cache.lru; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.file.blockfile.cache.BlockCache; -import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; -import org.apache.accumulo.core.file.blockfile.cache.CacheType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LruBlockCacheFactory extends BlockCacheManager { - - private static final Logger LOG = LoggerFactory.getLogger(LruBlockCacheFactory.class); - - @Override - protected BlockCache createCache(AccumuloConfiguration conf, CacheType type) { - LruBlockCacheConfiguration cc = new LruBlockCacheConfiguration(conf, type, getCacheImplName()); - LOG.info("Creating {} cache with configuration {}", type, cc); - return new LruBlockCache(cc); - } - - @Override - public String getCacheImplName() { - return "lru"; - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java new file mode 100644 index 0000000..82d9142 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/lru/LruBlockCacheManager.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.lru; + +import java.util.Map.Entry; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCache; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LruBlockCacheManager extends BlockCacheManager { + + private static final Logger LOG = LoggerFactory.getLogger(LruBlockCacheManager.class); + + @Override + protected BlockCache createCache(AccumuloConfiguration conf, CacheType type) { + LruBlockCacheConfiguration cc = new LruBlockCacheConfiguration(conf, type); + LOG.info("Creating {} cache with configuration {}", type, cc); + return new LruBlockCache(cc); + } + + @Override + public void stop() { + for (Entry<CacheType,BlockCache> e : this.caches.entrySet()) { + ((LruBlockCache) e.getValue()).shutdown(); + } + super.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java index 02d7b61..a67f164 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheConfiguration.java @@ -23,8 +23,10 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheType; public final class TinyLfuBlockCacheConfiguration extends BlockCacheConfiguration { - public TinyLfuBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type, String implName) { - super(conf, type, implName); + public static final String PROPERTY_PREFIX = "tinylfu"; + + public TinyLfuBlockCacheConfiguration(AccumuloConfiguration conf, CacheType type) { + super(conf, type, PROPERTY_PREFIX); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java deleted file mode 100644 index f3ca95c..0000000 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.file.blockfile.cache.tinylfu; - -import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; -import org.apache.accumulo.core.file.blockfile.cache.CacheType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TinyLfuBlockCacheFactory extends BlockCacheManager { - - private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCacheFactory.class); - - @Override - protected TinyLfuBlockCache createCache(AccumuloConfiguration conf, CacheType type) { - TinyLfuBlockCacheConfiguration cc = new TinyLfuBlockCacheConfiguration(conf, type, getCacheImplName()); - LOG.info("Creating {} cache with configuration {}", type, cc); - return new TinyLfuBlockCache(cc); - } - - @Override - public String getCacheImplName() { - return "tinylfu"; - } - - @Override - public void stop() {} - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java new file mode 100644 index 0000000..a68c4e6 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tinylfu/TinyLfuBlockCacheManager.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.file.blockfile.cache.tinylfu; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.CacheType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TinyLfuBlockCacheManager extends BlockCacheManager { + + private static final Logger LOG = LoggerFactory.getLogger(TinyLfuBlockCacheManager.class); + + @Override + protected TinyLfuBlockCache createCache(AccumuloConfiguration conf, CacheType type) { + TinyLfuBlockCacheConfiguration cc = new TinyLfuBlockCacheConfiguration(conf, type); + LOG.info("Creating {} cache with configuration {}", type, cc); + return new TinyLfuBlockCache(cc); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java index 39539c5..673382e 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactoryTest.java @@ -20,8 +20,8 @@ package org.apache.accumulo.core.file.blockfile.cache; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheFactory; -import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheFactory; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; +import org.apache.accumulo.core.file.blockfile.cache.tinylfu.TinyLfuBlockCacheManager; import org.junit.Assert; import org.junit.Test; @@ -31,7 +31,7 @@ public class BlockCacheFactoryTest { public void testCreateLruBlockCacheFactory() throws Exception { DefaultConfiguration dc = new DefaultConfiguration(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheManager.class.getName()); BlockCacheManager.getInstance(cc); } @@ -39,15 +39,15 @@ public class BlockCacheFactoryTest { public void testCreateTinyLfuBlockCacheFactory() throws Exception { DefaultConfiguration dc = new DefaultConfiguration(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_FACTORY_IMPL, TinyLfuBlockCacheFactory.class.getName()); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, TinyLfuBlockCacheManager.class.getName()); BlockCacheManager.getInstance(cc); } @Test public void testStartWithDefault() throws Exception { DefaultConfiguration dc = new DefaultConfiguration(); - BlockCacheManager factory = BlockCacheManager.getInstance(dc); - factory.start(dc); - Assert.assertNotNull(factory.getBlockCache(CacheType.INDEX)); + BlockCacheManager manager = BlockCacheManager.getInstance(dc); + manager.start(dc); + Assert.assertNotNull(manager.getBlockCache(CacheType.INDEX)); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java index 1dcd71a..d2901e3 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/BlockConfigurationHelperTest.java @@ -17,16 +17,26 @@ */ package org.apache.accumulo.core.file.blockfile.cache; +import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheConfiguration; import org.junit.Assert; import org.junit.Test; public class BlockConfigurationHelperTest { + private BlockCacheConfiguration bcc = new BlockCacheConfiguration(new ConfigurationCopy(new DefaultConfiguration()), CacheType.DATA, + LruBlockCacheConfiguration.PROPERTY_PREFIX); + + @Test + public void testGetFullPropertyName() throws Exception { + Assert.assertEquals("general.custom.cache.block.lru.data.acceptable.factor", + bcc.getHelper().getFullPropertyName(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY)); + } + @Test public void testGetPropertyPrefix() throws Exception { - BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(new DefaultConfiguration(), CacheType.DATA, "lru"); - Assert.assertEquals("general.custom.cache.block.lru.data.", helper.getPropertyPrefix()); + Assert.assertEquals("general.custom.cache.block.lru.data.", bcc.getHelper().getPropertyPrefix()); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java index dc2c4a6..c986843 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java +++ b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java @@ -25,9 +25,10 @@ import junit.framework.TestCase; import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration.BlockCacheConfigurationHelper; import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheConfiguration; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheFactory; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; /** * Tests the concurrent LruBlockCache. @@ -45,12 +46,12 @@ public class TestLruBlockCache extends TestCase { DefaultConfiguration dc = new DefaultConfiguration(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); - BlockCacheManager factory = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); - factory.start(cc); - LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); + manager.start(cc); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] blocks = generateFixedBlocks(10, blockSize, "block"); @@ -67,6 +68,8 @@ public class TestLruBlockCache extends TestCase { } // A single eviction run should have occurred assertEquals(cache.getEvictionCount(), 1); + + manager.stop(); } public void testCacheSimple() throws Exception { @@ -76,12 +79,12 @@ public class TestLruBlockCache extends TestCase { DefaultConfiguration dc = new DefaultConfiguration(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); - BlockCacheManager factory = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); - factory.start(cc); - LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); + manager.start(cc); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] blocks = generateRandomBlocks(100, blockSize); @@ -123,6 +126,7 @@ public class TestLruBlockCache extends TestCase { // Thread t = new LruBlockCache.StatisticsThread(cache); // t.start(); // t.join(); + manager.stop(); } public void testCacheEvictionSimple() throws Exception { @@ -132,14 +136,14 @@ public class TestLruBlockCache extends TestCase { DefaultConfiguration dc = new DefaultConfiguration(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); - BlockCacheManager factory = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); - BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory.getCacheImplName()); - helper.setIfAbsent(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); - factory.start(helper.getConfiguration()); - LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, LruBlockCacheConfiguration.PROPERTY_PREFIX); + helper.set(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); + manager.start(helper.getConfiguration()); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] blocks = generateFixedBlocks(10, blockSize, "block"); @@ -169,6 +173,7 @@ public class TestLruBlockCache extends TestCase { for (int i = 2; i < blocks.length; i++) { assertTrue(Arrays.equals(cache.getBlock(blocks[i].blockName).getBuffer(), blocks[i].buf)); } + manager.stop(); } public void testCacheEvictionTwoPriorities() throws Exception { @@ -178,19 +183,19 @@ public class TestLruBlockCache extends TestCase { DefaultConfiguration dc = new DefaultConfiguration(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); - BlockCacheManager factory = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); - BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory.getCacheImplName()); - helper.setIfAbsent(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); - helper.setIfAbsent(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.98f)); - helper.setIfAbsent(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); - helper.setIfAbsent(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.25f)); - helper.setIfAbsent(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.50f)); - helper.setIfAbsent(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.25f)); - factory.start(helper.getConfiguration()); - LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, LruBlockCacheConfiguration.PROPERTY_PREFIX); + helper.set(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); + helper.set(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.98f)); + helper.set(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); + helper.set(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.25f)); + helper.set(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.50f)); + helper.set(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.25f)); + manager.start(helper.getConfiguration()); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] singleBlocks = generateFixedBlocks(5, 10000, "single"); Block[] multiBlocks = generateFixedBlocks(5, 10000, "multi"); @@ -237,6 +242,7 @@ public class TestLruBlockCache extends TestCase { assertTrue(Arrays.equals(cache.getBlock(singleBlocks[i].blockName).getBuffer(), singleBlocks[i].buf)); assertTrue(Arrays.equals(cache.getBlock(multiBlocks[i].blockName).getBuffer(), multiBlocks[i].buf)); } + manager.stop(); } public void testCacheEvictionThreePriorities() throws Exception { @@ -246,19 +252,19 @@ public class TestLruBlockCache extends TestCase { DefaultConfiguration dc = new DefaultConfiguration(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); - BlockCacheManager factory = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); - BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory.getCacheImplName()); - helper.setIfAbsent(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); - helper.setIfAbsent(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.98f)); - helper.setIfAbsent(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); - helper.setIfAbsent(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.33f)); - helper.setIfAbsent(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.33f)); - helper.setIfAbsent(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.34f)); - factory.start(helper.getConfiguration()); - LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, LruBlockCacheConfiguration.PROPERTY_PREFIX); + helper.set(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); + helper.set(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.98f)); + helper.set(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); + helper.set(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.33f)); + helper.set(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.33f)); + helper.set(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.34f)); + manager.start(helper.getConfiguration()); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] singleBlocks = generateFixedBlocks(5, blockSize, "single"); Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); @@ -363,6 +369,7 @@ public class TestLruBlockCache extends TestCase { assertEquals(null, cache.getBlock(memoryBlocks[2].blockName)); assertEquals(null, cache.getBlock(memoryBlocks[3].blockName)); + manager.stop(); } // test scan resistance @@ -373,20 +380,20 @@ public class TestLruBlockCache extends TestCase { DefaultConfiguration dc = new DefaultConfiguration(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); - BlockCacheManager factory = BlockCacheManager.getInstance(cc); + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheManager.class.getName()); + BlockCacheManager manager = BlockCacheManager.getInstance(cc); cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(blockSize)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(maxSize)); - BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, factory.getCacheImplName()); - helper.setIfAbsent(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); - helper.setIfAbsent(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); - helper.setIfAbsent(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.66f)); - helper.setIfAbsent(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); - helper.setIfAbsent(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.33f)); - helper.setIfAbsent(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.33f)); - helper.setIfAbsent(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.34f)); - factory.start(helper.getConfiguration()); - LruBlockCache cache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); + BlockCacheConfigurationHelper helper = new BlockCacheConfigurationHelper(cc, CacheType.INDEX, LruBlockCacheConfiguration.PROPERTY_PREFIX); + helper.set(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); + helper.set(LruBlockCacheConfiguration.EVICTION_THREAD_PROPERTY, Boolean.FALSE.toString()); + helper.set(LruBlockCacheConfiguration.MIN_FACTOR_PROPERTY, Float.toString(0.66f)); + helper.set(LruBlockCacheConfiguration.ACCEPTABLE_FACTOR_PROPERTY, Float.toString(0.99f)); + helper.set(LruBlockCacheConfiguration.SINGLE_FACTOR_PROPERTY, Float.toString(0.33f)); + helper.set(LruBlockCacheConfiguration.MULTI_FACTOR_PROPERTY, Float.toString(0.33f)); + helper.set(LruBlockCacheConfiguration.MEMORY_FACTOR_PROPERTY, Float.toString(0.34f)); + manager.start(helper.getConfiguration()); + LruBlockCache cache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); Block[] singleBlocks = generateFixedBlocks(20, blockSize, "single"); Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi"); @@ -432,6 +439,7 @@ public class TestLruBlockCache extends TestCase { // Should now have 7 total blocks assertEquals(7, cache.size()); + manager.stop(); } private Block[] generateFixedBlocks(int numBlocks, int size, String pfx) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 442e62d..995c369 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -61,7 +61,7 @@ import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager; import org.apache.accumulo.core.file.blockfile.cache.CacheType; import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCache; -import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheFactory; +import org.apache.accumulo.core.file.blockfile.cache.lru.LruBlockCacheManager; import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile; import org.apache.accumulo.core.file.rfile.RFile.Reader; import org.apache.accumulo.core.file.streams.PositionedOutputs; @@ -209,6 +209,7 @@ public class RFileTest { protected AccumuloConfiguration accumuloConfiguration; public Reader reader; public SortedKeyValueIterator<Key,Value> iter; + private BlockCacheManager manager; public TestRFile(AccumuloConfiguration accumuloConfiguration) { this.accumuloConfiguration = accumuloConfiguration; @@ -270,19 +271,18 @@ public class RFileTest { DefaultConfiguration dc = new DefaultConfiguration(); ConfigurationCopy cc = new ConfigurationCopy(dc); - cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheFactory.class.getName()); - BlockCacheManager factory; + cc.set(Property.TSERV_CACHE_FACTORY_IMPL, LruBlockCacheManager.class.getName()); try { - factory = BlockCacheManager.getInstance(cc); + manager = BlockCacheManager.getInstance(cc); } catch (Exception e) { - throw new RuntimeException("Error creating BlockCacheFactory", e); + throw new RuntimeException("Error creating BlockCacheManager", e); } cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(100000)); cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(100000000)); cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(100000000)); - factory.start(cc); - LruBlockCache indexCache = (LruBlockCache) factory.getBlockCache(CacheType.INDEX); - LruBlockCache dataCache = (LruBlockCache) factory.getBlockCache(CacheType.DATA); + manager.start(cc); + LruBlockCache indexCache = (LruBlockCache) manager.getBlockCache(CacheType.INDEX); + LruBlockCache dataCache = (LruBlockCache) manager.getBlockCache(CacheType.DATA); CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, fileLength, conf, dataCache, indexCache, AccumuloConfiguration.getDefaultConfiguration()); reader = new RFile.Reader(_cbr); @@ -295,6 +295,9 @@ public class RFileTest { public void closeReader() throws IOException { reader.close(); in.close(); + if (null != manager) { + manager.stop(); + } } public void seek(Key nk) throws IOException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/5af81142/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 4e79eea..fc0665c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -178,7 +178,7 @@ public class TabletServerResourceManager { try { cacheManager = BlockCacheManager.getInstance(acuConf); } catch (Exception e) { - throw new RuntimeException("Error creating BlockCacheFactory", e); + throw new RuntimeException("Error creating BlockCacheManager", e); } cacheManager.start(acuConf); @@ -542,11 +542,13 @@ public class TabletServerResourceManager { executorService.shutdown(); } - try { - this.cacheManager.stop(); - } catch (Exception ex) { - log.error("Error stopping BlockCacheManager", ex); - } + if (null != this.cacheManager) { + try { + this.cacheManager.stop(); + } catch (Exception ex) { + log.error("Error stopping BlockCacheManager", ex); + } + } for (Entry<String,ExecutorService> entry : threadPools.entrySet()) { while (true) {
