Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-4463 [created] 03a8f2acf


ACCUMULO-4463: Make block cache implentation configurable


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/03a8f2ac
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/03a8f2ac
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/03a8f2ac

Branch: refs/heads/ACCUMULO-4463
Commit: 03a8f2acfb4f95b545c47a4e681dd5b8fa585c94
Parents: 2e171cd
Author: Dave Marion <[email protected]>
Authored: Mon May 8 11:47:53 2017 -0400
Committer: Dave Marion <[email protected]>
Committed: Mon May 8 11:47:53 2017 -0400

----------------------------------------------------------------------
 .../core/client/rfile/RFileScanner.java         |  20 ++-
 .../org/apache/accumulo/core/conf/Property.java |   3 +-
 .../core/file/blockfile/cache/BlockCache.java   |  20 +++
 .../file/blockfile/cache/BlockCacheFactory.java |  32 ++++
 .../file/blockfile/cache/LruBlockCache.java     | 155 ++++++++++++-------
 .../file/blockfile/cache/TinyLfuBlockCache.java |  11 +-
 .../accumulo/core/summary/SummaryReader.java    |   6 +
 .../file/blockfile/cache/TestLruBlockCache.java |  76 +++++----
 .../accumulo/core/file/rfile/RFileTest.java     |   6 +-
 .../tserver/TabletServerResourceManager.java    |  27 ++--
 10 files changed, 251 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java 
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index 1b12fb6..57322a8 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -127,6 +127,12 @@ class RFileScanner extends ScannerOptions implements 
Scanner {
         }
       };
     }
+
+    @Override
+    public void start(AccumuloConfiguration conf, long maxSize, long 
blockSize) throws Exception {}
+
+    @Override
+    public void stop() {}
   }
 
   RFileScanner(Opts opts) {
@@ -136,13 +142,23 @@ class RFileScanner extends ScannerOptions implements 
Scanner {
 
     this.opts = opts;
     if (opts.indexCacheSize > 0) {
-      this.indexCache = new LruBlockCache(opts.indexCacheSize, 
CACHE_BLOCK_SIZE);
+      this.indexCache = new LruBlockCache();
+      try {
+        this.indexCache.start((AccumuloConfiguration) null, 
opts.indexCacheSize, CACHE_BLOCK_SIZE);
+      } catch (Exception e) {
+        throw new RuntimeException("Error starting cache", e);
+      }
     } else {
       this.indexCache = new NoopCache();
     }
 
     if (opts.dataCacheSize > 0) {
-      this.dataCache = new LruBlockCache(opts.dataCacheSize, CACHE_BLOCK_SIZE);
+      this.dataCache = new LruBlockCache();
+      try {
+        this.dataCache.start((AccumuloConfiguration) null, opts.dataCacheSize, 
CACHE_BLOCK_SIZE);
+      } catch (Exception e) {
+        throw new RuntimeException("Error starting cache", e);
+      }
     } else {
       this.dataCache = new NoopCache();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 5480867..0bbaf10 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -245,7 +245,8 @@ public enum Property {
   TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this 
category affect the behavior of the tablet servers"),
   TSERV_CLIENT_TIMEOUT("tserver.client.timeout", "3s", 
PropertyType.TIMEDURATION, "Time to wait for clients to continue scans before 
closing a session."),
   TSERV_DEFAULT_BLOCKSIZE("tserver.default.blocksize", "1M", 
PropertyType.BYTES, "Specifies a default blocksize for the tserver caches"),
-  TSERV_CACHE_POLICY("tserver.cache.policy", "LRU", PropertyType.STRING, 
"Specifies the eviction policy of the file data caches (LRU or TinyLFU)."),
+  TSERV_CACHE_IMPL("tserver.cache.class", 
"org.apache.accumulo.core.file.blockfile.cache.LruBlockCache.class", 
PropertyType.STRING,
+      "Specifies the class name of the block cache implementation."),
   TSERV_DATACACHE_SIZE("tserver.cache.data.size", "10%", PropertyType.MEMORY, 
"Specifies the size of the cache for file data blocks."),
   TSERV_INDEXCACHE_SIZE("tserver.cache.index.size", "25%", 
PropertyType.MEMORY, "Specifies the size of the cache for file indices."),
   TSERV_SUMMARYCACHE_SIZE("tserver.cache.summary.size", "10%", 
PropertyType.MEMORY, "Specifies the size of the cache for summary data on each 
tablet server."),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
index 82f8b1e..c5a17e4 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCache.java
@@ -17,10 +17,30 @@
  */
 package org.apache.accumulo.core.file.blockfile.cache;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+
 /**
  * Block cache interface.
  */
 public interface BlockCache {
+
+  /**
+   * Start the block cache
+   *
+   * @param conf
+   *          Accumulo configuration object
+   * @param maxSize
+   *          maximum size of the on-heap cache
+   * @param blockSize
+   *          size of the default RFile blocks
+   */
+  void start(AccumuloConfiguration conf, long maxSize, long blockSize) throws 
Exception;
+
+  /**
+   * Stop the block cache and release resources
+   */
+  void stop();
+
   /**
    * Add block to cache.
    *

http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java
new file mode 100644
index 0000000..70cbc7d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/BlockCacheFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
+
+public class BlockCacheFactory {
+
+  public static BlockCache getBlockCache(AccumuloConfiguration conf) throws 
Exception {
+    String impl = conf.get(Property.TSERV_CACHE_IMPL);
+    Class<? extends BlockCache> clazz = AccumuloVFSClassLoader.loadClass(impl, 
BlockCache.class);
+    return clazz.newInstance();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
index cbdaca5..921b5a5 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/LruBlockCache.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +85,7 @@ public class LruBlockCache implements BlockCache, HeapSize {
   static final int statThreadPeriod = 60;
 
   /** Concurrent map (the cache) */
-  private final ConcurrentHashMap<String,CachedBlock> map;
+  private ConcurrentHashMap<String,CachedBlock> map;
 
   /** Eviction lock (locked when eviction in process) */
   private final ReentrantLock evictionLock = new ReentrantLock(true);
@@ -93,22 +94,22 @@ public class LruBlockCache implements BlockCache, HeapSize {
   private volatile boolean evictionInProgress = false;
 
   /** Eviction thread */
-  private final EvictionThread evictionThread;
+  private EvictionThread evictionThread;
 
   /** Statistics thread schedule pool (for heavy debugging, could remove) */
   private final ScheduledExecutorService scheduleThreadPool = 
Executors.newScheduledThreadPool(1, new 
NamingThreadFactory("LRUBlockCacheStats"));
 
   /** Current size of cache */
-  private final AtomicLong size;
+  private AtomicLong size;
 
   /** Current number of cached elements */
-  private final AtomicLong elements;
+  private AtomicLong elements;
 
   /** Cache access count (sequential ID) */
-  private final AtomicLong count;
+  private AtomicLong count;
 
   /** Cache statistics */
-  private final CacheStats stats;
+  private CacheStats stats;
 
   /** Maximum allowable size of cache (block put if size > max, evict) */
   private long maxSize;
@@ -117,74 +118,47 @@ public class LruBlockCache implements BlockCache, 
HeapSize {
   private long blockSize;
 
   /** Acceptable size of cache (no evictions if size < acceptable) */
-  private float acceptableFactor;
+  private float acceptableFactor = DEFAULT_ACCEPTABLE_FACTOR;
 
   /** Minimum threshold of cache (when evicting, evict until size < min) */
-  private float minFactor;
+  private float minFactor = DEFAULT_MIN_FACTOR;
 
   /** Single access bucket size */
-  private float singleFactor;
+  private float singleFactor = DEFAULT_SINGLE_FACTOR;
 
   /** Multiple access bucket size */
-  private float multiFactor;
+  private float multiFactor = DEFAULT_MULTI_FACTOR;
 
   /** In-memory bucket size */
-  private float memoryFactor;
+  private float memoryFactor = DEFAULT_MEMORY_FACTOR;
+
+  /** LruBlockCache cache = new LruBlockCache **/
+  private float mapLoadFactor = DEFAULT_LOAD_FACTOR;
+
+  /** LruBlockCache cache = new LruBlockCache **/
+  private int mapConcurrencyLevel = DEFAULT_CONCURRENCY_LEVEL;
 
   /** Overhead of the structure itself */
   private long overhead;
 
+  private boolean useEvictionThread = true;
+
   /**
    * Default constructor. Specify maximum size and expected average block size 
(approximation is fine).
    *
    * <p>
    * All other factors will be calculated based on defaults specified in this 
class.
    *
+   * @param conf
+   *          accumulo configuration
    * @param maxSize
    *          maximum size of cache, in bytes
    * @param blockSize
    *          approximate size of each block, in bytes
    */
-  public LruBlockCache(long maxSize, long blockSize) {
-    this(maxSize, blockSize, true);
-  }
-
-  /**
-   * Constructor used for testing. Allows disabling of the eviction thread.
-   */
-  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread) {
-    this(maxSize, blockSize, evictionThread, (int) Math.ceil(1.2 * maxSize / 
blockSize), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL, DEFAULT_MIN_FACTOR,
-        DEFAULT_ACCEPTABLE_FACTOR, DEFAULT_SINGLE_FACTOR, 
DEFAULT_MULTI_FACTOR, DEFAULT_MEMORY_FACTOR);
-  }
+  public void start(AccumuloConfiguration conf, long maxSize, long blockSize) {
+    int mapInitialSize = (int) Math.ceil(1.2 * maxSize / blockSize);
 
-  /**
-   * Configurable constructor. Use this constructor if not using defaults.
-   *
-   * @param maxSize
-   *          maximum size of this cache, in bytes
-   * @param blockSize
-   *          expected average size of blocks, in bytes
-   * @param evictionThread
-   *          whether to run evictions in a bg thread or not
-   * @param mapInitialSize
-   *          initial size of backing ConcurrentHashMap
-   * @param mapLoadFactor
-   *          initial load factor of backing ConcurrentHashMap
-   * @param mapConcurrencyLevel
-   *          initial concurrency factor for backing CHM
-   * @param minFactor
-   *          percentage of total size that eviction will evict until
-   * @param acceptableFactor
-   *          percentage of total size that triggers eviction
-   * @param singleFactor
-   *          percentage of total size for single-access blocks
-   * @param multiFactor
-   *          percentage of total size for multiple-access blocks
-   * @param memoryFactor
-   *          percentage of total size for in-memory blocks
-   */
-  public LruBlockCache(long maxSize, long blockSize, boolean evictionThread, 
int mapInitialSize, float mapLoadFactor, int mapConcurrencyLevel, float 
minFactor,
-      float acceptableFactor, float singleFactor, float multiFactor, float 
memoryFactor) {
     if (singleFactor + multiFactor + memoryFactor != 1) {
       throw new IllegalArgumentException("Single, multi, and memory factors " 
+ " should total 1.0");
     }
@@ -197,18 +171,13 @@ public class LruBlockCache implements BlockCache, 
HeapSize {
     this.maxSize = maxSize;
     this.blockSize = blockSize;
     map = new ConcurrentHashMap<>(mapInitialSize, mapLoadFactor, 
mapConcurrencyLevel);
-    this.minFactor = minFactor;
-    this.acceptableFactor = acceptableFactor;
-    this.singleFactor = singleFactor;
-    this.multiFactor = multiFactor;
-    this.memoryFactor = memoryFactor;
     this.stats = new CacheStats();
     this.count = new AtomicLong(0);
     this.elements = new AtomicLong(0);
     this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
     this.size = new AtomicLong(this.overhead);
 
-    if (evictionThread) {
+    if (useEvictionThread) {
       this.evictionThread = new EvictionThread(this);
       this.evictionThread.start();
       while (!this.evictionThread.running()) {
@@ -224,6 +193,80 @@ public class LruBlockCache implements BlockCache, HeapSize 
{
     this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), 
statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS);
   }
 
+  public void stop() {}
+
+  public float getMinFactor() {
+    return minFactor;
+  }
+
+  public float getSingleFactor() {
+    return singleFactor;
+  }
+
+  public float getMultiFactor() {
+    return multiFactor;
+  }
+
+  public float getMemoryFactor() {
+    return memoryFactor;
+  }
+
+  public float getMapLoadFactor() {
+    return mapLoadFactor;
+  }
+
+  public int getMapConcurrencyLevel() {
+    return mapConcurrencyLevel;
+  }
+
+  public long getOverhead() {
+    return overhead;
+  }
+
+  public boolean isUseEvictionThread() {
+    return useEvictionThread;
+  }
+
+  public void setMinFactor(float minFactor) {
+    this.minFactor = minFactor;
+  }
+
+  public void setSingleFactor(float singleFactor) {
+    this.singleFactor = singleFactor;
+  }
+
+  public void setMultiFactor(float multiFactor) {
+    this.multiFactor = multiFactor;
+  }
+
+  public void setMemoryFactor(float memoryFactor) {
+    this.memoryFactor = memoryFactor;
+  }
+
+  public void setMapLoadFactor(float mapLoadFactor) {
+    this.mapLoadFactor = mapLoadFactor;
+  }
+
+  public void setMapConcurrencyLevel(int mapConcurrencyLevel) {
+    this.mapConcurrencyLevel = mapConcurrencyLevel;
+  }
+
+  public void setOverhead(long overhead) {
+    this.overhead = overhead;
+  }
+
+  public void setUseEvictionThread(boolean useEvictionThread) {
+    this.useEvictionThread = useEvictionThread;
+  }
+
+  public float getAcceptableFactor() {
+    return acceptableFactor;
+  }
+
+  public void setAcceptableFactor(float acceptableFactor) {
+    this.acceptableFactor = acceptableFactor;
+  }
+
   public void setMaxSize(long maxSize) {
     this.maxSize = maxSize;
     if (this.size.get() > acceptableSize() && !evictionInProgress) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
index bab52af..ef2f664 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/TinyLfuBlockCache.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,11 +46,11 @@ public final class TinyLfuBlockCache implements BlockCache {
   private static final Logger log = 
LoggerFactory.getLogger(TinyLfuBlockCache.class);
   private static final int STATS_PERIOD_SEC = 60;
 
-  private final Cache<String,Block> cache;
-  private final Policy.Eviction<String,Block> policy;
-  private final ScheduledExecutorService statsExecutor;
+  private Cache<String,Block> cache;
+  private Policy.Eviction<String,Block> policy;
+  private ScheduledExecutorService statsExecutor;
 
-  public TinyLfuBlockCache(long maxSize, long blockSize) {
+  public void start(AccumuloConfiguration conf, long maxSize, long blockSize) {
     cache = Caffeine.newBuilder().initialCapacity((int) Math.ceil(1.2 * 
maxSize / blockSize)).weigher((String blockName, Block block) -> {
       int keyWeight = ClassSize.align(blockName.length()) + ClassSize.STRING;
       return keyWeight + block.weight();
@@ -61,6 +62,8 @@ public final class TinyLfuBlockCache implements BlockCache {
     statsExecutor.scheduleAtFixedRate(this::logStats, STATS_PERIOD_SEC, 
STATS_PERIOD_SEC, TimeUnit.SECONDS);
   }
 
+  public void stop() {}
+
   @Override
   public long getMaxSize() {
     return policy.getMaximum();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java 
b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
index 9b2b5d9..73cabf2 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java
@@ -89,6 +89,12 @@ public class SummaryReader {
     public Stats getStats() {
       return summaryCache.getStats();
     }
+
+    @Override
+    public void start(AccumuloConfiguration conf, long maxSize, long 
blockSize) throws Exception {}
+
+    @Override
+    public void stop() {}
   }
 
   private static List<SummarySerializer> load(BlockReader bcReader, 
Predicate<SummarizerConfiguration> summarySelector) throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
 
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
index a5ab14a..b5624c9 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestLruBlockCache.java
@@ -22,6 +22,8 @@ import java.util.Random;
 
 import junit.framework.TestCase;
 
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+
 /**
  * Tests the concurrent LruBlockCache.
  * <p>
@@ -36,7 +38,8 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 100000;
     long blockSize = calculateBlockSizeDefault(maxSize, 9); // room for 9, 
will evict
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize);
+    LruBlockCache cache = new LruBlockCache();
+    cache.start((AccumuloConfiguration) null, maxSize, blockSize);
 
     Block[] blocks = generateFixedBlocks(10, blockSize, "block");
 
@@ -60,7 +63,8 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 1000000;
     long blockSize = calculateBlockSizeDefault(maxSize, 101);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize);
+    LruBlockCache cache = new LruBlockCache();
+    cache.start((AccumuloConfiguration) null, maxSize, blockSize);
 
     Block[] blocks = generateRandomBlocks(100, blockSize);
 
@@ -109,7 +113,9 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 100000;
     long blockSize = calculateBlockSizeDefault(maxSize, 10);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false);
+    LruBlockCache cache = new LruBlockCache();
+    cache.setUseEvictionThread(false);
+    cache.start((AccumuloConfiguration) null, maxSize, blockSize);
 
     Block[] blocks = generateFixedBlocks(10, blockSize, "block");
 
@@ -146,12 +152,16 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 100000;
     long blockSize = calculateBlockSizeDefault(maxSize, 10);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) 
Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
-        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min
-        0.99f, // acceptable
-        0.25f, // single
-        0.50f, // multi
-        0.25f);// memory
+    LruBlockCache cache = new LruBlockCache();
+    cache.setUseEvictionThread(false);
+    cache.setMapLoadFactor(LruBlockCache.DEFAULT_LOAD_FACTOR);
+    cache.setMapConcurrencyLevel(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL);
+    cache.setMinFactor(0.98f);
+    cache.setAcceptableFactor(0.99f);
+    cache.setSingleFactor(0.25f);
+    cache.setMultiFactor(0.50f);
+    cache.setMemoryFactor(0.25f);
+    cache.start((AccumuloConfiguration) null, maxSize, blockSize);
 
     Block[] singleBlocks = generateFixedBlocks(5, 10000, "single");
     Block[] multiBlocks = generateFixedBlocks(5, 10000, "multi");
@@ -205,12 +215,16 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 100000;
     long blockSize = calculateBlockSize(maxSize, 10);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) 
Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
-        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min
-        0.99f, // acceptable
-        0.33f, // single
-        0.33f, // multi
-        0.34f);// memory
+    LruBlockCache cache = new LruBlockCache();
+    cache.setUseEvictionThread(false);
+    cache.setMapLoadFactor(LruBlockCache.DEFAULT_LOAD_FACTOR);
+    cache.setMapConcurrencyLevel(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL);
+    cache.setMinFactor(0.98f);
+    cache.setAcceptableFactor(0.99f);
+    cache.setSingleFactor(0.33f);
+    cache.setMultiFactor(0.33f);
+    cache.setMemoryFactor(0.34f);
+    cache.start((AccumuloConfiguration) null, maxSize, blockSize);
 
     Block[] singleBlocks = generateFixedBlocks(5, blockSize, "single");
     Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
@@ -323,12 +337,16 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 100000;
     long blockSize = calculateBlockSize(maxSize, 10);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) 
Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
-        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.66f, // min
-        0.99f, // acceptable
-        0.33f, // single
-        0.33f, // multi
-        0.34f);// memory
+    LruBlockCache cache = new LruBlockCache();
+    cache.setUseEvictionThread(false);
+    cache.setMapLoadFactor(LruBlockCache.DEFAULT_LOAD_FACTOR);
+    cache.setMapConcurrencyLevel(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL);
+    cache.setMinFactor(0.66f);
+    cache.setAcceptableFactor(0.99f);
+    cache.setSingleFactor(0.33f);
+    cache.setMultiFactor(0.33f);
+    cache.setMemoryFactor(0.34f);
+    cache.start((AccumuloConfiguration) null, maxSize, blockSize);
 
     Block[] singleBlocks = generateFixedBlocks(20, blockSize, "single");
     Block[] multiBlocks = generateFixedBlocks(5, blockSize, "multi");
@@ -382,12 +400,16 @@ public class TestLruBlockCache extends TestCase {
     long maxSize = 300000;
     long blockSize = calculateBlockSize(maxSize, 31);
 
-    LruBlockCache cache = new LruBlockCache(maxSize, blockSize, false, (int) 
Math.ceil(1.2 * maxSize / blockSize), LruBlockCache.DEFAULT_LOAD_FACTOR,
-        LruBlockCache.DEFAULT_CONCURRENCY_LEVEL, 0.98f, // min
-        0.99f, // acceptable
-        0.33f, // single
-        0.33f, // multi
-        0.34f);// memory
+    LruBlockCache cache = new LruBlockCache();
+    cache.setUseEvictionThread(false);
+    cache.setMapLoadFactor(LruBlockCache.DEFAULT_LOAD_FACTOR);
+    cache.setMapConcurrencyLevel(LruBlockCache.DEFAULT_CONCURRENCY_LEVEL);
+    cache.setMinFactor(0.98f);
+    cache.setAcceptableFactor(0.99f);
+    cache.setSingleFactor(0.33f);
+    cache.setMultiFactor(0.33f);
+    cache.setMemoryFactor(0.34f);
+    cache.start((AccumuloConfiguration) null, maxSize, blockSize);
 
     Block[] singleBlocks = generateFixedBlocks(10, blockSize, "single");
     Block[] multiBlocks = generateFixedBlocks(10, blockSize, "multi");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java 
b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index fc43ef1..5cec5eb 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -264,8 +264,10 @@ public class RFileTest {
       in = new FSDataInputStream(bais);
       fileLength = data.length;
 
-      LruBlockCache indexCache = new LruBlockCache(100000000, 100000);
-      LruBlockCache dataCache = new LruBlockCache(100000000, 100000);
+      LruBlockCache indexCache = new LruBlockCache();
+      indexCache.start((AccumuloConfiguration) null, 100000000, 100000);
+      LruBlockCache dataCache = new LruBlockCache();
+      dataCache.start((AccumuloConfiguration) null, 100000000, 100000);
 
       CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(in, 
fileLength, conf, dataCache, indexCache, 
AccumuloConfiguration.getDefaultConfiguration());
       reader = new RFile.Reader(_cbr);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/03a8f2ac/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index b84997d..476a4ca 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -40,8 +40,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
-import org.apache.accumulo.core.file.blockfile.cache.LruBlockCache;
-import org.apache.accumulo.core.file.blockfile.cache.TinyLfuBlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheFactory;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.NamingThreadFactory;
@@ -175,17 +174,15 @@ public class TabletServerResourceManager {
     long sCacheSize = acuConf.getAsBytes(Property.TSERV_SUMMARYCACHE_SIZE);
     long totalQueueSize = 
acuConf.getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
 
-    String policy = acuConf.get(Property.TSERV_CACHE_POLICY);
-    if (policy.equalsIgnoreCase("LRU")) {
-      _iCache = new LruBlockCache(iCacheSize, blockSize);
-      _dCache = new LruBlockCache(dCacheSize, blockSize);
-      _sCache = new LruBlockCache(sCacheSize, blockSize);
-    } else if (policy.equalsIgnoreCase("TinyLFU")) {
-      _iCache = new TinyLfuBlockCache(iCacheSize, blockSize);
-      _dCache = new TinyLfuBlockCache(dCacheSize, blockSize);
-      _sCache = new TinyLfuBlockCache(sCacheSize, blockSize);
-    } else {
-      throw new IllegalArgumentException("Unknown Block cache policy " + 
policy);
+    try {
+      _iCache = BlockCacheFactory.getBlockCache(acuConf);
+      _iCache.start(acuConf, iCacheSize, blockSize);
+      _dCache = BlockCacheFactory.getBlockCache(acuConf);
+      _dCache.start(acuConf, dCacheSize, blockSize);
+      _sCache = BlockCacheFactory.getBlockCache(acuConf);
+      _sCache.start(acuConf, sCacheSize, blockSize);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Error constructing block cache", e);
     }
 
     Runtime runtime = Runtime.getRuntime();
@@ -543,6 +540,10 @@ public class TabletServerResourceManager {
       executorService.shutdown();
     }
 
+    this._dCache.stop();
+    this._iCache.stop();
+    this._sCache.stop();
+
     for (Entry<String,ExecutorService> entry : threadPools.entrySet()) {
       while (true) {
         try {

Reply via email to