Repository: accumulo
Updated Branches:
  refs/heads/IGNITE [created] 2e992c889


Branch off ACCUMULO-4463 to start working on Apache Ignite block cache


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2e992c88
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2e992c88
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2e992c88

Branch: refs/heads/IGNITE
Commit: 2e992c8896d71921513102d294179f4782b51dc2
Parents: 61dd036
Author: Dave Marion <[email protected]>
Authored: Fri May 12 16:09:22 2017 -0400
Committer: Dave Marion <[email protected]>
Committed: Fri May 12 16:09:22 2017 -0400

----------------------------------------------------------------------
 core/pom.xml                                    |  4 +
 .../cache/tiered/TieredBlockCache.java          | 97 ++++++++++++++++++++
 .../tiered/TieredBlockCacheConfiguration.java   | 40 ++++++++
 .../cache/tiered/TieredBlockCacheManager.java   | 79 ++++++++++++++++
 .../blockfile/cache/TestTieredBlockCache.java   | 75 +++++++++++++++
 pom.xml                                         |  6 ++
 6 files changed, 301 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index aec7f66..f524c71 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -96,6 +96,10 @@
       <artifactId>htrace-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.ignite</groupId>
+      <artifactId>ignite-core</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
new file mode 100644
index 0000000..6031666
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCache.java
@@ -0,0 +1,97 @@
+package org.apache.accumulo.core.file.blockfile.cache.tiered;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TieredBlockCache implements BlockCache {
+       
+       public static final class Block implements CacheEntry {
+         private volatile byte[] buffer;
+         private volatile Object index;
+
+         public Block(byte[] buffer) {
+           this.buffer = requireNonNull(buffer);
+         }
+
+         @Override
+         public byte[] getBuffer() {
+           return buffer;
+         }
+
+         @Override
+         public Object getIndex() {
+           return index;
+         }
+
+           @Override
+         public void setIndex(Object index) {
+           this.index = index;
+      }
+       }
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TieredBlockCache.class);
+       private final IgniteCache<String, Block> cache;
+       private final TieredBlockCacheConfiguration conf;
+       private final AtomicLong hitCount = new AtomicLong(0);
+       private final AtomicLong requestCount = new AtomicLong(0);
+
+       
+       public TieredBlockCache(TieredBlockCacheConfiguration conf, Ignite 
ignite) {
+               this.conf = conf;
+               this.cache = ignite.getOrCreateCache(conf.getConfiguration());
+               LOG.info("Created {} cache with configuration {}", 
+                               conf.getConfiguration().getName(), 
conf.getConfiguration());
+       }
+       
+       public void stop() {
+               this.cache.close();
+       }
+       
+       @Override
+       public CacheEntry cacheBlock(String blockName, byte[] buf, boolean 
inMemory) {
+               return cacheBlock(blockName, buf);
+       }
+       
+       @Override
+       public CacheEntry cacheBlock(String blockName, byte[] buf) {
+               return this.cache.getAndPutIfAbsent(blockName, new Block(buf));
+       }
+
+       @Override
+       public CacheEntry getBlock(String blockName) {
+               this.requestCount.incrementAndGet();
+               Block b = this.cache.get(blockName);
+               if (null != b) {
+                       this.hitCount.incrementAndGet();
+               }
+               return b;
+       }
+
+       @Override
+       public long getMaxSize() {
+               return this.conf.getMaxSize();
+       }
+
+       @Override
+       public Stats getStats() {
+               return new Stats() {
+                       @Override
+                       public long hitCount() {
+                               return hitCount.get();
+                       }
+                       @Override
+                       public long requestCount() {
+                               return requestCount.get();
+                       }
+               };
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
new file mode 100644
index 0000000..5f2cde5
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheConfiguration.java
@@ -0,0 +1,40 @@
+package org.apache.accumulo.core.file.blockfile.cache.tiered;
+
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+import 
org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCache.Block;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.eviction.lru.LruEvictionPolicy;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+public class TieredBlockCacheConfiguration extends BlockCacheConfiguration {
+       
+       private final CacheConfiguration<String, Block> configuration;
+
+       public TieredBlockCacheConfiguration(AccumuloConfiguration conf, 
CacheType type) {
+         super(conf, type, TieredBlockCacheManager.PROPERTY_PREFIX);
+
+         configuration = new CacheConfiguration<>();
+         configuration.setName(type.name());
+         configuration.setCacheMode(CacheMode.LOCAL);
+         configuration.setOnheapCacheEnabled(true);
+         configuration.setEvictionPolicy(new LruEvictionPolicy<String, 
Block>((int) this.getMaxSize()));
+         
configuration.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(Duration.ONE_HOUR));
+       }
+
+       public CacheConfiguration<String, Block> getConfiguration() {
+         return configuration;
+       }
+
+       @Override
+       public String toString() {
+         return this.configuration.toString();
+       }
+       
+       
+       
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
new file mode 100644
index 0000000..5ea8a80
--- /dev/null
+++ 
b/core/src/main/java/org/apache/accumulo/core/file/blockfile/cache/tiered/TieredBlockCacheManager.java
@@ -0,0 +1,79 @@
+package org.apache.accumulo.core.file.blockfile.cache.tiered;
+
+import java.util.Optional;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCacheManager;
+import org.apache.accumulo.core.file.blockfile.cache.CacheType;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TieredBlockCacheManager extends BlockCacheManager {
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(TieredBlockCacheManager.class);
+       
+       public static final String PROPERTY_PREFIX = "tiered";
+       
+       private static final String TIERED_PROPERTY_BASE = 
BlockCacheManager.CACHE_PROPERTY_BASE + PROPERTY_PREFIX + ".";
+       public static final String OFF_HEAP_MAX_SIZE_PROPERTY = 
TIERED_PROPERTY_BASE + "off-heap.max.size";
+       public static final String OFF_HEAP_BLOCK_SIZE_PROPERTY = 
TIERED_PROPERTY_BASE + "off-heap.block.size";
+       
+       private static final long OFF_HEAP_MAX_SIZE_DEFAULT = 512 * 1024 * 1024;
+       private static final int OFF_HEAP_BLOCK_SIZE_DEFAULT = 16 * 1024;
+
+       private Ignite IGNITE;
+       
+       @Override
+       public void start(AccumuloConfiguration conf) {
+               
+               final long offHeapMaxSize = 
Optional.ofNullable(conf.get(OFF_HEAP_MAX_SIZE_PROPERTY)).map(Long::valueOf).filter(f
 -> f > 0).orElse(OFF_HEAP_MAX_SIZE_DEFAULT);
+               final int offHeapBlockSize = 
Optional.ofNullable(conf.get(OFF_HEAP_BLOCK_SIZE_PROPERTY)).map(Integer::valueOf).filter(f
 -> f > 0).orElse(OFF_HEAP_BLOCK_SIZE_DEFAULT);
+
+               // Ignite configuration.
+               IgniteConfiguration cfg = new IgniteConfiguration();
+               cfg.setDaemon(true);
+               
+               // Global Off-Heap Page memory configuration.
+               MemoryConfiguration memCfg = new MemoryConfiguration();
+               memCfg.setPageSize(offHeapBlockSize);
+               
+               MemoryPolicyConfiguration plCfg = new 
MemoryPolicyConfiguration();
+               plCfg.setInitialSize(offHeapMaxSize);
+               plCfg.setMaxSize(offHeapMaxSize);
+               plCfg.setPageEvictionMode(DataPageEvictionMode.RANDOM_2_LRU);
+               plCfg.setEvictionThreshold(0.9);
+
+               memCfg.setMemoryPolicies(plCfg); //apply custom memory policy
+               
+               cfg.setMemoryConfiguration(memCfg); // apply off-heap memory 
configuration
+               LOG.info("Starting Ignite with configuration {}", 
cfg.toString());
+               IGNITE = Ignition.start(cfg);
+
+               super.start(conf);
+       }
+
+       @Override
+       public void stop() {
+               for (CacheType type : CacheType.values()) {
+                 TieredBlockCache cache = (TieredBlockCache) 
this.getBlockCache(type);
+                 if (null != cache) {
+                       cache.stop();
+                 }
+               }
+               IGNITE.close();
+               super.stop();
+       }
+
+       @Override
+       protected BlockCache createCache(AccumuloConfiguration conf, CacheType 
type) {
+               return new TieredBlockCache(new 
TieredBlockCacheConfiguration(conf, type), IGNITE);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
 
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
new file mode 100644
index 0000000..3149438
--- /dev/null
+++ 
b/core/src/test/java/org/apache/accumulo/core/file/blockfile/cache/TestTieredBlockCache.java
@@ -0,0 +1,75 @@
+package org.apache.accumulo.core.file.blockfile.cache;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCache;
+import 
org.apache.accumulo.core.file.blockfile.cache.tiered.TieredBlockCacheManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTieredBlockCache {
+       
+       private static final long BLOCKSIZE = 1024;
+       private static final long MAXSIZE = 1024*100;
+       
+       private static class Holder {
+               private final String name;
+               private final byte[] buf;
+               public Holder(String name, byte[] buf) {
+                       super();
+                       this.name = name;
+                       this.buf = buf;
+               }
+               public String getName() {
+                       return name;
+               }
+               public byte[] getBuf() {
+                       return buf;
+               }
+       }
+       
+       @Test
+       public void testCacheCreation() throws Exception {
+           DefaultConfiguration dc = new DefaultConfiguration();
+           ConfigurationCopy cc = new ConfigurationCopy(dc);
+           cc.set(Property.TSERV_CACHE_FACTORY_IMPL, 
TieredBlockCacheManager.class.getName());
+           BlockCacheManager manager = BlockCacheManager.getInstance(cc);
+           cc.set(Property.TSERV_DEFAULT_BLOCKSIZE, Long.toString(BLOCKSIZE));
+           cc.set(Property.TSERV_DATACACHE_SIZE, Long.toString(MAXSIZE));
+           cc.set(Property.TSERV_INDEXCACHE_SIZE, Long.toString(MAXSIZE));
+           cc.set(Property.TSERV_SUMMARYCACHE_SIZE, Long.toString(MAXSIZE));
+           manager.start(cc);
+           TieredBlockCache cache = (TieredBlockCache) 
manager.getBlockCache(CacheType.DATA);
+           Holder[] blocks = generateRandomBlocks(100, BLOCKSIZE);
+           // Confirm empty
+           for (Holder h : blocks) {
+             Assert.assertNull(cache.getBlock(h.getName()));
+           }
+           // Add blocks
+           for (Holder h : blocks) {
+             cache.cacheBlock(h.getName(), h.getBuf());
+           }
+           // Check if all blocks are properly cached and retrieved
+           for (Holder h : blocks) {
+             CacheEntry ce = cache.getBlock(h.getName());
+             Assert.assertTrue(ce != null);
+             Assert.assertEquals(ce.getBuffer().length, h.getBuf().length);
+           }
+
+           manager.stop();
+       }
+       
+    private Holder[] generateRandomBlocks(int numBlocks, long maxSize) {
+      Holder[] blocks = new Holder[numBlocks];
+      Random r = new Random();
+      for (int i = 0; i < numBlocks; i++) {
+        blocks[i] = new Holder("block" + i, Integer.toString(r.nextInt((int) 
maxSize) + 1).getBytes(StandardCharsets.UTF_8));
+      }
+      return blocks;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2e992c88/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 78a5712..4a37b51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,7 @@
     <hadoop.version>2.6.4</hadoop.version>
     <htrace.version>3.1.0-incubating</htrace.version>
     <httpclient.version>4.3.1</httpclient.version>
+    <ignite.version>2.0.0</ignite.version>
     <it.failIfNoSpecifiedTests>false</it.failIfNoSpecifiedTests>
     <!-- jetty 9.2 is the last version to support jdk less than 1.8 -->
     <jetty.version>9.2.17.v20160517</jetty.version>
@@ -432,6 +433,11 @@
         <version>${httpclient.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-core</artifactId>
+        <version>${ignite.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.maven</groupId>
         <artifactId>maven-artifact</artifactId>
         <version>${maven.min-version}</version>

Reply via email to