Repository: hbase
Updated Branches:
  refs/heads/master ba6345f7d -> a4a235b8d


HBASE-13170 Allow block cache to be external

Summary: Add MemcachedBlockCache

Test Plan: Tested locally with PE and running memcached.

Subscribers: rajesh.nishtala, ndimiduk

Differential Revision: https://reviews.facebook.net/D34635


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a4a235b8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a4a235b8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a4a235b8

Branch: refs/heads/master
Commit: a4a235b8d131a04b7ba6e411271ef8f519f4a6c6
Parents: ba6345f
Author: Elliott Clark <ecl...@apache.org>
Authored: Fri Mar 27 13:15:27 2015 -0700
Committer: Elliott Clark <ecl...@apache.org>
Committed: Fri Mar 27 13:15:27 2015 -0700

----------------------------------------------------------------------
 hbase-server/pom.xml                            |   5 +
 .../tmpl/regionserver/BlockCacheTmpl.jamon      |  62 +++--
 .../hadoop/hbase/io/hfile/CacheConfig.java      |  82 +++++-
 .../hbase/io/hfile/CombinedBlockCache.java      |  47 ++--
 .../hadoop/hbase/io/hfile/HFileBlock.java       |  30 +-
 .../io/hfile/InclusiveCombinedBlockCache.java   |  58 ++++
 .../hadoop/hbase/io/hfile/LruBlockCache.java    |  37 ++-
 .../hbase/io/hfile/MemcachedBlockCache.java     | 272 +++++++++++++++++++
 .../hadoop/hbase/io/hfile/TestCacheConfig.java  |   4 +-
 pom.xml                                         |   7 +
 10 files changed, 521 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a235b8/hbase-server/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index 67a1e20..76bc960 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -490,6 +490,11 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>net.spy</groupId>
+      <artifactId>spymemcached</artifactId>
+      <optional>true</optional>
+    </dependency>
     <!-- tracing Dependencies -->
     <dependency>
       <groupId>org.apache.htrace</groupId>

http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a235b8/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon
 
b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon
index e4ff70f..0419196 100644
--- 
a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon
+++ 
b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/regionserver/BlockCacheTmpl.jamon
@@ -206,6 +206,38 @@ org.apache.hadoop.util.StringUtils;
 </%if>
 </%def>
 
+<%def hits_tmpl>
+<%args>
+    BlockCache bc;
+</%args>
+    <tr>
+        <td>Hits</td>
+        <td><% String.format("%,d", bc.getStats().getHitCount()) %></td>
+        <td>Number requests that were cache hits</td>
+    </tr>
+    <tr>
+        <td>Hits Caching</td>
+        <td><% String.format("%,d", bc.getStats().getHitCachingCount()) %></td>
+        <td>Cache hit block requests but only requests set to cache block if a 
miss</td>
+    </tr>
+    <tr>
+        <td>Misses</td>
+        <td><% String.format("%,d", bc.getStats().getMissCount()) %></td>
+        <td>Block requests that were cache misses but set to cache missed 
blocks</td>
+    </tr>
+    <tr>
+        <td>Misses Caching</td>
+        <td><% String.format("%,d", bc.getStats().getMissCount()) %></td>
+        <td>Block requests that were cache misses but only requests set to use 
block cache</td>
+    </tr>
+    <tr>
+        <td>Hit Ratio</td>
+        <td><% String.format("%,.2f", bc.getStats().getHitRatio() * 100) %><% 
"%" %></td>
+        <td>Hit Count divided by total requests count</td>
+    </tr>
+
+</%def>
+
 <%def bc_stats>
 <%args>
     CacheConfig cacheConfig;
@@ -235,31 +267,7 @@ org.apache.hadoop.util.StringUtils;
         <td>Number of blocks in block cache</td>
     </tr>
     <& evictions_tmpl; bc = cacheConfig.getBlockCache(); &>
-    <tr>
-        <td>Hits</td>
-        <td><% String.format("%,d", 
cacheConfig.getBlockCache().getStats().getHitCount()) %></td>
-        <td>Number requests that were cache hits</td>
-    </tr>
-    <tr>
-        <td>Hits Caching</td>
-        <td><% String.format("%,d", 
cacheConfig.getBlockCache().getStats().getHitCachingCount()) %></td>
-        <td>Cache hit block requests but only requests set to cache block if a 
miss</td>
-    </tr>
-    <tr>
-        <td>Misses</td>
-        <td><% String.format("%,d", 
cacheConfig.getBlockCache().getStats().getMissCount()) %></td>
-        <td>Block requests that were cache misses but set to cache missed 
blocks</td>
-    </tr>
-    <tr>
-        <td>Misses Caching</td>
-        <td><% String.format("%,d", 
cacheConfig.getBlockCache().getStats().getMissCount()) %></td>
-        <td>Block requests that were cache misses but only requests set to use 
block cache</td>
-    </tr>
-    <tr>
-        <td>Hit Ratio</td>
-        <td><% String.format("%,.2f", 
cacheConfig.getBlockCache().getStats().getHitRatio() * 100) %><% "%" %></td>
-        <td>Hit Count divided by total requests count</td>
-    </tr>
+    <& hits_tmpl; bc = cacheConfig.getBlockCache(); &>
 </table>
 <p>If block cache is made up of more than one cache -- i.e. a L1 and a L2 -- 
then the above
 are combined counts. Request count is sum of hits and misses.</p>
@@ -349,7 +357,9 @@ are combined counts. Request count is sum of hits and 
misses.</p>
         <td>Size of DATA Blocks</td>
     </tr>
 </%if> 
-<%if evictions %><& evictions_tmpl; bc = bc; &></%if> 
+<& evictions_tmpl; bc = bc; &>
+<& hits_tmpl; bc = bc; &>
+
 <%if bucketCache %>
     <tr>
         <td>Hits per Second</td>

http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a235b8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index f212f14..5d221c8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -126,8 +127,25 @@ public class CacheConfig {
    */
   public static final String BLOCKCACHE_BLOCKSIZE_KEY = 
"hbase.offheapcache.minblocksize";
 
-  // Defaults
+  private static final String EXTERNAL_BLOCKCACHE_KEY = 
"hbase.blockcache.use.external";
+  private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false;
+
+  private static final String 
EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class";
+
+  /**
+   * Enum of all built in external block caches.
+   * This is used for config.
+   */
+  private static enum ExternalBlockCaches {
+    memcached(MemcachedBlockCache.class);
+    // TODO(eclark): Consider more. Redis, etc.
+    Class<? extends BlockCache> clazz;
+    ExternalBlockCaches(Class<? extends BlockCache> clazz) {
+      this.clazz = clazz;
+    }
+  }
 
+  // Defaults
   public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
   public static final boolean DEFAULT_CACHE_DATA_ON_WRITE = false;
   public static final boolean DEFAULT_IN_MEMORY = false;
@@ -478,7 +496,44 @@ public class CacheConfig {
    * @return Returns L2 block cache instance (for now it is BucketCache 
BlockCache all the time)
    * or null if not supposed to be a L2.
    */
-  private static BucketCache getL2(final Configuration c, final MemoryUsage 
mu) {
+  private static BlockCache getL2(final Configuration c, final MemoryUsage mu) 
{
+    final boolean useExternal = c.getBoolean(EXTERNAL_BLOCKCACHE_KEY, 
EXTERNAL_BLOCKCACHE_DEFAULT);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Trying to use " + (useExternal?" External":" Internal") + " 
l2 cache");
+    }
+
+    // If we want to use an external block cache then create that.
+    if (useExternal) {
+      return getExternalBlockcache(c);
+    }
+
+    // otherwise use the bucket cache.
+    return getBucketCache(c, mu);
+
+  }
+
+  private static BlockCache getExternalBlockcache(Configuration c) {
+    Class klass = null;
+
+    // Get the class, from the config. s
+    try {
+      klass = ExternalBlockCaches.valueOf(c.get(EXTERNAL_BLOCKCACHE_CLASS_KEY, 
"memcache")).clazz;
+    } catch (IllegalArgumentException exception) {
+      klass = c.getClass(EXTERNAL_BLOCKCACHE_CLASS_KEY, 
MemcachedBlockCache.class);
+    }
+
+    // Now try and create an instance of the block cache.
+    try {
+      LOG.info("Creating external block cache of type: " + klass);
+      return (BlockCache) ReflectionUtils.newInstance(klass, c);
+    } catch (Exception e) {
+      LOG.warn("Error creating external block cache", e);
+    }
+    return null;
+
+  }
+
+  private static BlockCache getBucketCache(Configuration c, MemoryUsage mu) {
     // Check for L2.  ioengine name must be non-null.
     String bucketCacheIOEngineName = c.get(BUCKET_CACHE_IOENGINE_KEY, null);
     if (bucketCacheIOEngineName == null || bucketCacheIOEngineName.length() <= 
0) return null;
@@ -533,22 +588,27 @@ public class CacheConfig {
     LruBlockCache l1 = getL1(conf, mu);
     // blockCacheDisabled is set as a side-effect of getL1(), so check it 
again after the call.
     if (blockCacheDisabled) return null;
-    BucketCache l2 = getL2(conf, mu);
+    BlockCache l2 = getL2(conf, mu);
     if (l2 == null) {
       GLOBAL_BLOCK_CACHE_INSTANCE = l1;
     } else {
+      boolean useExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, 
EXTERNAL_BLOCKCACHE_DEFAULT);
       boolean combinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,
         DEFAULT_BUCKET_CACHE_COMBINED);
-      if (combinedWithLru) {
-        GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2);
+      if (useExternal) {
+        GLOBAL_BLOCK_CACHE_INSTANCE = new InclusiveCombinedBlockCache(l1, l2);
       } else {
-        // L1 and L2 are not 'combined'.  They are connected via the 
LruBlockCache victimhandler
-        // mechanism.  It is a little ugly but works according to the 
following: when the
-        // background eviction thread runs, blocks evicted from L1 will go to 
L2 AND when we get
-        // a block from the L1 cache, if not in L1, we will search L2.
-        l1.setVictimCache(l2);
-        GLOBAL_BLOCK_CACHE_INSTANCE = l1;
+        if (combinedWithLru) {
+          GLOBAL_BLOCK_CACHE_INSTANCE = new CombinedBlockCache(l1, l2);
+        } else {
+          // L1 and L2 are not 'combined'.  They are connected via the 
LruBlockCache victimhandler
+          // mechanism.  It is a little ugly but works according to the 
following: when the
+          // background eviction thread runs, blocks evicted from L1 will go 
to L2 AND when we get
+          // a block from the L1 cache, if not in L1, we will search L2.
+          GLOBAL_BLOCK_CACHE_INSTANCE = l1;
+        }
       }
+      l1.setVictimCache(l2);
     }
     return GLOBAL_BLOCK_CACHE_INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a235b8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index 52a5793..7725cf9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -25,32 +25,37 @@ import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 
+
 /**
  * CombinedBlockCache is an abstraction layer that combines
  * {@link LruBlockCache} and {@link BucketCache}. The smaller lruCache is used
- * to cache bloom blocks and index blocks.  The larger bucketCache is used to
+ * to cache bloom blocks and index blocks.  The larger l2Cache is used to
  * cache data blocks. {@link #getBlock(BlockCacheKey, boolean, boolean, 
boolean)} reads
- * first from the smaller lruCache before looking for the block in the 
bucketCache.  Blocks evicted
+ * first from the smaller lruCache before looking for the block in the 
l2Cache.  Blocks evicted
  * from lruCache are put into the bucket cache. 
  * Metrics are the combined size and hits and misses of both caches.
  * 
  */
 @InterfaceAudience.Private
 public class CombinedBlockCache implements ResizableBlockCache, HeapSize {
-  private final LruBlockCache lruCache;
-  private final BucketCache bucketCache;
-  private final CombinedCacheStats combinedCacheStats;
+  protected final LruBlockCache lruCache;
+  protected final BlockCache l2Cache;
+  protected final CombinedCacheStats combinedCacheStats;
 
-  public CombinedBlockCache(LruBlockCache lruCache, BucketCache bucketCache) {
+  public CombinedBlockCache(LruBlockCache lruCache, BlockCache l2Cache) {
     this.lruCache = lruCache;
-    this.bucketCache = bucketCache;
+    this.l2Cache = l2Cache;
     this.combinedCacheStats = new CombinedCacheStats(lruCache.getStats(),
-        bucketCache.getStats());
+        l2Cache.getStats());
   }
 
   @Override
   public long heapSize() {
-    return lruCache.heapSize() + bucketCache.heapSize();
+    long l2size = 0;
+    if (l2Cache instanceof HeapSize) {
+      l2size = ((HeapSize) l2Cache).heapSize();
+    }
+    return lruCache.heapSize() + l2size;
   }
 
   @Override
@@ -60,7 +65,7 @@ public class CombinedBlockCache implements 
ResizableBlockCache, HeapSize {
     if (isMetaBlock || cacheDataInL1) {
       lruCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1);
     } else {
-      bucketCache.cacheBlock(cacheKey, buf, inMemory, cacheDataInL1);
+      l2Cache.cacheBlock(cacheKey, buf, inMemory, false);
     }
   }
 
@@ -73,22 +78,24 @@ public class CombinedBlockCache implements 
ResizableBlockCache, HeapSize {
   public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
       boolean repeat, boolean updateCacheMetrics) {
     // TODO: is there a hole here, or just awkwardness since in the lruCache 
getBlock
-    // we end up calling bucketCache.getBlock.
+    // we end up calling l2Cache.getBlock.
     if (lruCache.containsBlock(cacheKey)) {
       return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
     }
-    return bucketCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
+    Cacheable result = l2Cache.getBlock(cacheKey, caching, repeat, 
updateCacheMetrics);
+
+    return result;
   }
 
   @Override
   public boolean evictBlock(BlockCacheKey cacheKey) {
-    return lruCache.evictBlock(cacheKey) || bucketCache.evictBlock(cacheKey);
+    return lruCache.evictBlock(cacheKey) || l2Cache.evictBlock(cacheKey);
   }
 
   @Override
   public int evictBlocksByHfileName(String hfileName) {
     return lruCache.evictBlocksByHfileName(hfileName)
-        + bucketCache.evictBlocksByHfileName(hfileName);
+        + l2Cache.evictBlocksByHfileName(hfileName);
   }
 
   @Override
@@ -99,27 +106,27 @@ public class CombinedBlockCache implements 
ResizableBlockCache, HeapSize {
   @Override
   public void shutdown() {
     lruCache.shutdown();
-    bucketCache.shutdown();
+    l2Cache.shutdown();
   }
 
   @Override
   public long size() {
-    return lruCache.size() + bucketCache.size();
+    return lruCache.size() + l2Cache.size();
   }
 
   @Override
   public long getFreeSize() {
-    return lruCache.getFreeSize() + bucketCache.getFreeSize();
+    return lruCache.getFreeSize() + l2Cache.getFreeSize();
   }
 
   @Override
   public long getCurrentSize() {
-    return lruCache.getCurrentSize() + bucketCache.getCurrentSize();
+    return lruCache.getCurrentSize() + l2Cache.getCurrentSize();
   }
 
   @Override
   public long getBlockCount() {
-    return lruCache.getBlockCount() + bucketCache.getBlockCount();
+    return lruCache.getBlockCount() + l2Cache.getBlockCount();
   }
 
   private static class CombinedCacheStats extends CacheStats {
@@ -205,7 +212,7 @@ public class CombinedBlockCache implements 
ResizableBlockCache, HeapSize {
 
   @Override
   public BlockCache[] getBlockCaches() {
-    return new BlockCache [] {this.lruCache, this.bucketCache};
+    return new BlockCache [] {this.lruCache, this.l2Cache};
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a235b8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 8f82a63..4115941 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -116,7 +116,7 @@ public class HFileBlock implements Cacheable {
    */
   static final int CHECKSUM_SIZE = Bytes.SIZEOF_INT;
 
-  private static final CacheableDeserializer<Cacheable> blockDeserializer =
+  static final CacheableDeserializer<Cacheable> blockDeserializer =
       new CacheableDeserializer<Cacheable>() {
         public HFileBlock deserialize(ByteBuffer buf, boolean reuse) throws 
IOException{
           buf.limit(buf.limit() - 
HFileBlock.EXTRA_SERIALIZATION_SPACE).rewind();
@@ -130,13 +130,13 @@ public class HFileBlock implements Cacheable {
           buf.position(buf.limit());
           buf.limit(buf.limit() + HFileBlock.EXTRA_SERIALIZATION_SPACE);
           boolean usesChecksum = buf.get() == (byte)1;
-          HFileBlock ourBuffer = new HFileBlock(newByteBuffer, usesChecksum);
-          ourBuffer.offset = buf.getLong();
-          ourBuffer.nextBlockOnDiskSizeWithHeader = buf.getInt();
-          if (ourBuffer.hasNextBlockHeader()) {
-            ourBuffer.buf.limit(ourBuffer.buf.limit() - 
ourBuffer.headerSize());
+          HFileBlock hFileBlock = new HFileBlock(newByteBuffer, usesChecksum);
+          hFileBlock.offset = buf.getLong();
+          hFileBlock.nextBlockOnDiskSizeWithHeader = buf.getInt();
+          if (hFileBlock.hasNextBlockHeader()) {
+            hFileBlock.buf.limit(hFileBlock.buf.limit() - 
hFileBlock.headerSize());
           }
-          return ourBuffer;
+          return hFileBlock;
         }
 
         @Override
@@ -670,7 +670,7 @@ public class HFileBlock implements Cacheable {
    * @return true if succeeded reading the extra bytes
    * @throws IOException if failed to read the necessary bytes
    */
-  public static boolean readWithExtra(InputStream in, byte buf[],
+  public static boolean readWithExtra(InputStream in, byte[] buf,
       int bufOffset, int necessaryLen, int extraLen) throws IOException {
     int bytesRemaining = necessaryLen + extraLen;
     while (bytesRemaining > 0) {
@@ -776,7 +776,8 @@ public class HFileBlock implements Cacheable {
     /**
      * Valid in the READY state. Contains the header and the uncompressed (but
      * potentially encoded, if this is a data block) bytes, so the length is
-     * {@link #uncompressedSizeWithoutHeader} + {@link 
org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
+     * {@link #uncompressedSizeWithoutHeader} +
+     * {@link org.apache.hadoop.hbase.HConstants#HFILEBLOCK_HEADER_SIZE}.
      * Does not store checksums.
      */
     private byte[] uncompressedBytesWithHeader;
@@ -1059,7 +1060,9 @@ public class HFileBlock implements Cacheable {
      */
     int getOnDiskSizeWithoutHeader() {
       expectState(State.BLOCK_READY);
-      return onDiskBytesWithHeader.length + onDiskChecksum.length - 
HConstants.HFILEBLOCK_HEADER_SIZE;
+      return onDiskBytesWithHeader.length
+          + onDiskChecksum.length
+          - HConstants.HFILEBLOCK_HEADER_SIZE;
     }
 
     /**
@@ -1832,7 +1835,8 @@ public class HFileBlock implements Cacheable {
     if (!fileContext.isUseHBaseChecksum() || 
this.fileContext.getBytesPerChecksum() == 0) {
       return 0;
     }
-    return (int)ChecksumUtil.numBytes(onDiskDataSizeWithHeader, 
this.fileContext.getBytesPerChecksum());
+    return (int) ChecksumUtil.numBytes(onDiskDataSizeWithHeader,
+        this.fileContext.getBytesPerChecksum());
   }
 
   /**
@@ -1886,8 +1890,8 @@ public class HFileBlock implements Cacheable {
     byte[] magicBuf = new byte[Math.min(buf.limit() - buf.position(), 
BlockType.MAGIC_LENGTH)];
     buf.get(magicBuf);
     BlockType bt = BlockType.parse(magicBuf, 0, BlockType.MAGIC_LENGTH);
-    int compressedBlockSizeNoHeader = buf.getInt();;
-    int uncompressedBlockSizeNoHeader = buf.getInt();;
+    int compressedBlockSizeNoHeader = buf.getInt();
+    int uncompressedBlockSizeNoHeader = buf.getInt();
     long prevBlockOffset = buf.getLong();
     byte cksumtype = buf.get();
     long bytesPerChecksum = buf.getInt();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a235b8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java
new file mode 100644
index 0000000..667e7b4
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/InclusiveCombinedBlockCache.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class InclusiveCombinedBlockCache extends CombinedBlockCache implements 
BlockCache {
+  public InclusiveCombinedBlockCache(LruBlockCache l1, BlockCache l2) {
+    super(l1,l2);
+  }
+
+  @Override
+  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
+                            boolean repeat, boolean updateCacheMetrics) {
+    // On all external cache set ups the lru should have the l2 cache set as 
the victimHandler
+    // Because of that all requests that miss inside of the lru block cache 
will be
+    // tried in the l2 block cache.
+    return lruCache.getBlock(cacheKey, caching, repeat, updateCacheMetrics);
+  }
+
+  /**
+   *
+   * @param cacheKey The block's cache key.
+   * @param buf The block contents wrapped in a ByteBuffer.
+   * @param inMemory Whether block should be treated as in-memory. This 
parameter is only useful for
+   *                 the L1 lru cache.
+   * @param cacheDataInL1 This is totally ignored.
+   */
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean 
inMemory,
+                         final boolean cacheDataInL1) {
+    // This is the inclusive part of the combined block cache.
+    // Every block is placed into both block caches.
+    lruCache.cacheBlock(cacheKey, buf, inMemory, true);
+
+    // This assumes that insertion into the L2 block cache is either async or 
very fast.
+    l2Cache.cacheBlock(cacheKey, buf, inMemory, true);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a235b8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index 82df5f7..bf46bcf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -197,8 +197,7 @@ public class LruBlockCache implements ResizableBlockCache, 
HeapSize {
   private boolean forceInMemory;
 
   /** Where to send victims (blocks evicted/missing from the cache) */
-  // TODO: Fix it so this is not explicit reference to a particular BlockCache 
implementation.
-  private BucketCache victimHandler = null;
+  private BlockCache victimHandler = null;
 
   /**
    * Default constructor.  Specify maximum size and expected average block
@@ -419,8 +418,17 @@ public class LruBlockCache implements ResizableBlockCache, 
HeapSize {
     LruCachedBlock cb = map.get(cacheKey);
     if (cb == null) {
       if (!repeat && updateCacheMetrics) stats.miss(caching);
-      if (victimHandler != null) {
-        return victimHandler.getBlock(cacheKey, caching, repeat, 
updateCacheMetrics);
+      // If there is another block cache then try and read there.
+      // However if this is a retry ( second time in double checked locking )
+      // And it's already a miss then the l2 will also be a miss.
+      if (victimHandler != null && !repeat) {
+        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, 
updateCacheMetrics);
+
+        // Promote this to L1.
+        if (result != null && caching) {
+          cacheBlock(cacheKey, result, /* inMemory = */ false, /* cacheData = 
*/ true);
+        }
+        return result;
       }
       return null;
     }
@@ -489,10 +497,14 @@ public class LruBlockCache implements 
ResizableBlockCache, HeapSize {
     }
     stats.evicted(block.getCachedTime());
     if (evictedByEvictionProcess && victimHandler != null) {
-      boolean wait = getCurrentSize() < acceptableSize();
-      boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
-      victimHandler.cacheBlockWithWait(block.getCacheKey(), block.getBuffer(),
-          inMemory, wait);
+      if (victimHandler instanceof BucketCache) {
+        boolean wait = getCurrentSize() < acceptableSize();
+        boolean inMemory = block.getPriority() == BlockPriority.MEMORY;
+        ((BucketCache)victimHandler).cacheBlockWithWait(block.getCacheKey(), 
block.getBuffer(),
+            inMemory, wait);
+      } else {
+        victimHandler.cacheBlock(block.getCacheKey(), block.getBuffer());
+      }
     }
     return block.heapSize();
   }
@@ -787,7 +799,10 @@ public class LruBlockCache implements ResizableBlockCache, 
HeapSize {
         synchronized(this) {
           try {
             this.wait(1000 * 10/*Don't wait for ever*/);
-          } catch(InterruptedException e) {}
+          } catch(InterruptedException e) {
+            LOG.warn("Interrupted eviction thread ", e);
+            Thread.currentThread().interrupt();
+          }
         }
         LruBlockCache cache = this.cache.get();
         if (cache == null) break;
@@ -1057,7 +1072,7 @@ public class LruBlockCache implements 
ResizableBlockCache, HeapSize {
     return counts;
   }
 
-  public void setVictimCache(BucketCache handler) {
+  public void setVictimCache(BlockCache handler) {
     assert victimHandler == null;
     victimHandler = handler;
   }
@@ -1067,7 +1082,7 @@ public class LruBlockCache implements 
ResizableBlockCache, HeapSize {
     return map;
   }
 
-  BucketCache getVictimHandler() {
+  BlockCache getVictimHandler() {
     return this.victimHandler;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a235b8/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
new file mode 100644
index 0000000..57e7f28
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/MemcachedBlockCache.java
@@ -0,0 +1,272 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hfile;
+
+import net.spy.memcached.CachedData;
+import net.spy.memcached.ConnectionFactoryBuilder;
+import net.spy.memcached.FailureMode;
+import net.spy.memcached.MemcachedClient;
+import net.spy.memcached.transcoders.Transcoder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Class to store blocks into memcached.
+ * This should only be used on a cluster of Memcached daemons that are tuned 
well and have a
+ * good network connection to the HBase regionservers. Any other use will 
likely slow down HBase
+ * greatly.
+ */
+@InterfaceAudience.Private
+public class MemcachedBlockCache implements BlockCache {
+  private static final Log LOG = 
LogFactory.getLog(MemcachedBlockCache.class.getName());
+
+  // Some memcache versions won't take more than 1024 * 1024. So set the limit 
below
+  // that just in case this client is used with those versions.
+  public static final int MAX_SIZE = 1020 * 1024;
+
+  // Config key for what memcached servers to use.
+  // They should be specified in a comma sperated list with ports.
+  // like:
+  //
+  // host1:11211,host3:8080,host4:11211
+  public static final String MEMCACHED_CONFIG_KEY = 
"hbase.cache.memcached.servers";
+  public static final String MEMCACHED_TIMEOUT_KEY = 
"hbase.cache.memcached.timeout";
+  public static final String MEMCACHED_OPTIMEOUT_KEY = 
"hbase.cache.memcached.optimeout";
+  public static final long MEMCACHED_DEFAULT_TIMEOUT = 500;
+
+  private final MemcachedClient client;
+  private final HFileBlockTranscoder tc = new HFileBlockTranscoder();
+  private final CacheStats cacheStats = new CacheStats("MemcachedBlockCache");
+
+  public MemcachedBlockCache(Configuration c) throws IOException {
+    LOG.info("Creating MemcachedBlockCache");
+
+    long opTimeout = c.getLong(MEMCACHED_OPTIMEOUT_KEY, 
MEMCACHED_DEFAULT_TIMEOUT);
+    long queueTimeout = c.getLong(MEMCACHED_TIMEOUT_KEY, opTimeout + 
MEMCACHED_DEFAULT_TIMEOUT);
+
+    ConnectionFactoryBuilder builder = new ConnectionFactoryBuilder()
+        .setOpTimeout(opTimeout)
+        .setOpQueueMaxBlockTime(queueTimeout) // Cap the max time before 
anything times out
+        .setFailureMode(FailureMode.Redistribute)
+        .setShouldOptimize(true)              // When regions move lots of 
reads happen together
+                                              // So combining them into single 
requests is nice.
+        .setDaemon(true)                      // Don't keep threads around 
past the end of days.
+        .setUseNagleAlgorithm(false)          // Ain't nobody got time for that
+        .setReadBufferSize(HConstants.DEFAULT_BLOCKSIZE * 4 * 1024);  // 4 
times larger than the
+                                                                      // 
default block just in case
+
+
+    // Assume only the localhost is serving memecached.
+    // A la mcrouter or co-locating memcached with split regionservers.
+    //
+    // If this config is a pool of memecached servers they will all be used 
according to the
+    // default hashing scheme defined by the memcache client. Spy Memecache 
client in this
+    // case.
+    String serverListString = c.get(MEMCACHED_CONFIG_KEY,"localhost:11211");
+    String[] servers = serverListString.split(",");
+    List<InetSocketAddress> serverAddresses = new 
ArrayList<InetSocketAddress>(servers.length);
+    for (String s:servers) {
+      
serverAddresses.add(Addressing.createInetSocketAddressFromHostAndPortStr(s));
+    }
+
+    client = new MemcachedClient(builder.build(), serverAddresses);
+  }
+
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey,
+                         Cacheable buf,
+                         boolean inMemory,
+                         boolean cacheDataInL1) {
+    cacheBlock(cacheKey, buf);
+  }
+
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
+    if (buf instanceof HFileBlock) {
+      client.add(cacheKey.toString(), MAX_SIZE, (HFileBlock) buf, tc);
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("MemcachedBlockCache can not cache Cacheable's of type "
+            + buf.getClass().toString());
+      }
+    }
+  }
+
+  @Override
+  public Cacheable getBlock(BlockCacheKey cacheKey, boolean caching,
+                            boolean repeat, boolean updateCacheMetrics) {
+    // Assume that nothing is the block cache
+    HFileBlock result = null;
+
+    try (TraceScope traceScope = 
Trace.startSpan("MemcachedBlockCache.getBlock")) {
+      result = client.get(cacheKey.toString(), tc);
+    } catch (Exception e) {
+      // Catch a pretty broad set of exceptions to limit any changes in the 
memecache client
+      // and how it handles failures from leaking into the read path.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Exception pulling from memcached [ "
+            + cacheKey.toString()
+            + " ]. Treating as a miss.", e);
+      }
+      result = null;
+    } finally {
+      // Update stats if this request doesn't have it turned off 100% of the 
time
+      if (updateCacheMetrics) {
+        if (result == null) {
+          cacheStats.miss(caching);
+        } else {
+          cacheStats.hit(caching);
+        }
+      }
+    }
+
+
+    return result;
+  }
+
+  @Override
+  public boolean evictBlock(BlockCacheKey cacheKey) {
+    try {
+      cacheStats.evict();
+      return client.delete(cacheKey.toString()).get();
+    } catch (InterruptedException e) {
+      LOG.warn("Error deleting " + cacheKey.toString(), e);
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Error deleting " + cacheKey.toString(), e);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * This method does nothing so that memcached can handle all evictions.
+   */
+  @Override
+  public int evictBlocksByHfileName(String hfileName) {
+    return 0;
+  }
+
+  @Override
+  public CacheStats getStats() {
+    return cacheStats;
+  }
+
+  @Override
+  public void shutdown() {
+    client.shutdown();
+  }
+
+  @Override
+  public long size() {
+    return 0;
+  }
+
+  @Override
+  public long getFreeSize() {
+    return 0;
+  }
+
+  @Override
+  public long getCurrentSize() {
+    return 0;
+  }
+
+  @Override
+  public long getBlockCount() {
+    return 0;
+  }
+
+  @Override
+  public Iterator<CachedBlock> iterator() {
+    return new Iterator<CachedBlock>() {
+      @Override
+      public boolean hasNext() {
+        return false;
+      }
+
+      @Override
+      public CachedBlock next() {
+        throw new NoSuchElementException("MemcachedBlockCache can't iterate 
over blocks.");
+      }
+
+      @Override
+      public void remove() {
+
+      }
+    };
+  }
+
+  @Override
+  public BlockCache[] getBlockCaches() {
+    return null;
+  }
+
+  /**
+   * Class to encode and decode an HFileBlock to and from memecached's 
resulting byte arrays.
+   */
+  private static class HFileBlockTranscoder implements Transcoder<HFileBlock> {
+
+    @Override
+    public boolean asyncDecode(CachedData d) {
+      return false;
+    }
+
+    @Override
+    public CachedData encode(HFileBlock block) {
+      ByteBuffer bb = ByteBuffer.allocate(block.getSerializedLength());
+      block.serialize(bb);
+      return new CachedData(0, bb.array(), CachedData.MAX_SIZE);
+    }
+
+    @Override
+    public HFileBlock decode(CachedData d) {
+      try {
+        ByteBuffer buf = ByteBuffer.wrap(d.getData());
+        return (HFileBlock) HFileBlock.blockDeserializer.deserialize(buf, 
true);
+      } catch (IOException e) {
+        LOG.warn("Error deserializing data from memcached",e);
+      }
+      return null;
+    }
+
+    @Override
+    public int getMaxSize() {
+      return MAX_SIZE;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a235b8/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
index c5fcc3c..ce78a37 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheConfig.java
@@ -283,9 +283,9 @@ public class TestCacheConfig {
     // TODO: Assert sizes allocated are right and proportions.
     LruBlockCache lbc = (LruBlockCache)cc.getBlockCache();
     assertEquals(lruExpectedSize, lbc.getMaxSize());
-    BucketCache bc = lbc.getVictimHandler();
+    BlockCache bc = lbc.getVictimHandler();
     // getMaxSize comes back in bytes but we specified size in MB
-    assertEquals(bcExpectedSize, bc.getMaxSize());
+    assertEquals(bcExpectedSize, ((BucketCache) bc).getMaxSize());
     // Test the L1+L2 deploy works as we'd expect with blocks evicted from L1 
going to L2.
     long initialL1BlockCount = lbc.getBlockCount();
     long initialL2BlockCount = bc.getBlockCount();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a4a235b8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d5b974c..b6175e1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1174,6 +1174,7 @@
     <netty.version>4.0.23.Final</netty.version>
     <joni.version>2.1.2</joni.version>
     <jcodings.version>1.0.8</jcodings.version>
+    <spy.version>2.11.6</spy.version>
     <!-- Plugin Dependencies -->
     <maven.assembly.version>2.4</maven.assembly.version>
     <maven.antrun.version>1.6</maven.antrun.version>
@@ -1665,6 +1666,12 @@
         <artifactId>disruptor</artifactId>
         <version>${disruptor.version}</version>
       </dependency>
+        <dependency>
+        <groupId>net.spy</groupId>
+        <artifactId>spymemcached</artifactId>
+        <version>${spy.version}</version>
+        <optional>true</optional>
+    </dependency>
       <dependency>
         <groupId>org.jmock</groupId>
         <artifactId>jmock-junit4</artifactId>

Reply via email to