Author: mbautin
Date: Thu Feb  2 22:54:18 2012
New Revision: 1239905

URL: http://svn.apache.org/viewvc?rev=1239905&view=rev
Log:
[jira] [HBASE-5230] [89-fb] Ensure we do not cache data blocks on compaction

Summary:
This is a port of https://reviews.facebook.net/D1353 to 89-fb adding a unit
test that checks that we do not cache-on-write data blocks on compaction, and a
fix to make the unit test pass. This also includes a few pieces of code from
trunk, e.g. TestFromClientSide.testCacheOnWriteEvictOnClose and some methods
that it relies on, that previously were not there in 89-fb.

Submitting as an internal diff for now since there are some problems with the
apache_sync.py tool that syncs patches to hbase-89-fb-apache.

Test Plan: Unit tests, dev cluster

Reviewers: kannan, liyintang, kranganathan, nspiegelberg

Reviewed By: kannan

CC: hbase-eng@lists

Differential Revision: https://phabricator.fb.com/D400208

Revert Plan: OK

Modified:
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
    
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
    
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
(original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
Thu Feb  2 22:54:18 2012
@@ -44,6 +44,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -380,8 +381,9 @@ public class HTable implements HTableInt
    * @return A map of HRegionInfo with it's server address
    * @throws IOException if a remote or network exception occurs
    */
-  public Map<HRegionInfo, HServerAddress> getRegionsInfo() throws IOException {
-    final Map<HRegionInfo, HServerAddress> regionMap =
+  public NavigableMap<HRegionInfo, HServerAddress> getRegionsInfo()
+      throws IOException {
+    final NavigableMap<HRegionInfo, HServerAddress> regionMap =
       new TreeMap<HRegionInfo, HServerAddress>();
 
     MetaScannerVisitor visitor = new MetaScannerVisitor() {

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java
 Thu Feb  2 22:54:18 2012
@@ -241,7 +241,7 @@ public class HFileReaderV1 extends Abstr
       HFileBlock hfileBlock = fsBlockReader.readBlockData(offset,
           nextOffset - offset, 
metaBlockIndexReader.getRootBlockDataSize(block),
           true);
-      configureWithSchema(hfileBlock);
+      passSchemaMetricsTo(hfileBlock);
       hfileBlock.expectType(BlockType.META);
 
       long delta = System.currentTimeMillis() - now;
@@ -316,7 +316,7 @@ public class HFileReaderV1 extends Abstr
 
       HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset
           - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread);
-      configureWithSchema(hfileBlock);
+      passSchemaMetricsTo(hfileBlock);
       hfileBlock.expectType(BlockType.DATA);
       ByteBuffer buf = hfileBlock.getBufferWithoutHeader();
 

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java
 Thu Feb  2 22:54:18 2012
@@ -196,7 +196,7 @@ public class HFileReaderV2 extends Abstr
 
       HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset,
           blockSize, -1, true);
-      configureWithSchema(metaBlock);
+      passSchemaMetricsTo(metaBlock);
 
       long delta = System.currentTimeMillis() - now;
       HFile.readTime += delta;
@@ -246,7 +246,6 @@ public class HFileReaderV2 extends Abstr
     IdLock.Entry lockEntry = offsetLock.getLockEntry(dataBlockOffset);
     try {
       // Check cache for block. If found return.
-      cacheBlock &= cacheConf.shouldCacheDataOnRead();
       if (cacheConf.isBlockCacheEnabled()) {
         HFileBlock cachedBlock =
           (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, 
cacheBlock);
@@ -261,23 +260,24 @@ public class HFileReaderV2 extends Abstr
 
       // Load block from filesystem.
       long now = System.currentTimeMillis();
-      HFileBlock dataBlock = fsBlockReader.readBlockData(dataBlockOffset,
+      HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset,
           onDiskBlockSize, -1, pread);
-      configureWithSchema(dataBlock);
-      BlockCategory blockCategory = dataBlock.getBlockType().getCategory();
+      passSchemaMetricsTo(hfileBlock);
+      BlockCategory blockCategory = hfileBlock.getBlockType().getCategory();
 
       long delta = System.currentTimeMillis() - now;
       HFile.readTime += delta;
       HFile.readOps++;
       getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta);
 
-      // Cache the block
-      if (cacheBlock) {
-        cacheConf.getBlockCache().cacheBlock(cacheKey, dataBlock,
+      // Cache the block if necessary
+      if (cacheBlock && cacheConf.shouldCacheBlockOnRead(
+              hfileBlock.getBlockType().getCategory())) {
+        cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock,
             cacheConf.isInMemory());
       }
 
-      return dataBlock;
+      return hfileBlock;
     } finally {
       offsetLock.releaseLockEntry(lockEntry);
     }

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java
 Thu Feb  2 22:54:18 2012
@@ -227,7 +227,7 @@ public class HFileWriterV1 extends Abstr
       HFileBlock cBlock = new HFileBlock(BlockType.DATA,
           (int) (outputStream.getPos() - blockBegin), bytes.length, -1,
           ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin);
-      configureWithSchema(cBlock);
+      passSchemaMetricsTo(cBlock);
       cacheConf.getBlockCache().cacheBlock(
           HFile.getBlockCacheKey(name, blockBegin),cBlock);
       baosDos.close();

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java
 Thu Feb  2 22:54:18 2012
@@ -257,7 +257,7 @@ public class HFileWriterV2 extends Abstr
     HFile.writeOps++;
 
     if (cacheConf.shouldCacheDataOnWrite()) {
-      cacheBlockOnWrite(lastDataBlockOffset);
+      doCacheOnWrite(lastDataBlockOffset);
     }
   }
 
@@ -274,17 +274,23 @@ public class HFileWriterV2 extends Abstr
             fsBlockWriter.getUncompressedSizeWithoutHeader());
 
         if (cacheThisBlock) {
-          cacheBlockOnWrite(offset);
+          doCacheOnWrite(offset);
         }
       }
     }
   }
 
-  private void cacheBlockOnWrite(long offset) {
-    final HFileBlock cBlock = fsBlockWriter.getBlockForCaching();
-    configureWithSchema(cBlock);
-    cacheConf.getBlockCache().cacheBlock(HFile.getBlockCacheKey(name, offset),
-        cBlock);
+  /**
+   * Caches the last written HFile block.
+   * @param offset the offset of the block we want to cache. Used to determine
+   *          the cache key.
+   */
+  private void doCacheOnWrite(long offset) {
+    // Cache this block on write.
+    HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching();
+    passSchemaMetricsTo(cacheFormatBlock);
+    cacheConf.getBlockCache().cacheBlock(
+        HFile.getBlockCacheKey(name, offset), cacheFormatBlock);
   }
 
   /**

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
 Thu Feb  2 22:54:18 2012
@@ -20,15 +20,17 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import java.lang.ref.WeakReference;
+import java.util.EnumMap;
+import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -845,4 +847,14 @@ public class LruBlockCache implements Bl
     return fileNames;
   }
 
+  Map<BlockType, Integer> getBlockTypeCountsForTest() {
+    Map<BlockType, Integer> counts =
+        new EnumMap<BlockType, Integer>(BlockType.class);
+    for (CachedBlock cb : map.values()) {
+      BlockType blockType = ((HFileBlock) cb.getBuffer()).getBlockType();
+      Integer count = counts.get(blockType);
+      counts.put(blockType, (count == null ? 0 : count) + 1);
+    }
+    return counts;
+  }
 }

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 Thu Feb  2 22:54:18 2012
@@ -2604,6 +2604,11 @@ public class HRegionServer implements HR
     return sortedRegions;
   }
 
+  public HRegion getOnlineRegionByFullName(final String regionName) {
+    return this.onlineRegions.get(Bytes.mapKey(Bytes.toBytes(
+        regionName)));
+  }
+
   /**
    * @param regionName
    * @return HRegion for the passed <code>regionName</code> or null if named

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
 Thu Feb  2 22:54:18 2012
@@ -658,10 +658,34 @@ public class Store extends SchemaConfigu
    */
   private StoreFile.Writer createWriterInTmp(long maxKeyCount)
   throws IOException {
-    return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize,
-        this.compression, this.comparator, this.conf, this.cacheConf,
-        this.family.getBloomFilterType(), 
this.family.getBloomFilterErrorRate(),
-        maxKeyCount, region.getFavoredNodes());
+    return createWriterInTmp(maxKeyCount, this.compression, false);
+  }
+
+  /*
+   * @param maxKeyCount
+   * @param compression Compression algorithm to use
+   * @param isCompaction whether we are creating a new file in a compaction
+   * @return Writer for a new StoreFile in the tmp dir.
+   */
+  private StoreFile.Writer createWriterInTmp(long maxKeyCount,
+    Compression.Algorithm compression, boolean isCompaction)
+  throws IOException {
+    final CacheConfig writerCacheConf;
+    if (isCompaction) {
+      // Don't cache data on write on compactions.
+      writerCacheConf = new CacheConfig(cacheConf);
+      writerCacheConf.setCacheDataOnWrite(false);
+    } else {
+      writerCacheConf = cacheConf;
+    }
+    StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(),
+        blocksize, compression, comparator, conf, writerCacheConf,
+        family.getBloomFilterType(), maxKeyCount);
+    // The store file writer's path does not include the CF name, so we need
+    // to configure the HFile writer directly.
+    SchemaConfigured sc = (SchemaConfigured) w.writer;
+    passSchemaMetricsTo(sc);
+    return w;
   }
 
   /*
@@ -1295,7 +1319,7 @@ public class Store extends SchemaConfigu
           hasMore = scanner.next(kvs, 1);
           if (!kvs.isEmpty()) {
             if (writer == null) {
-              writer = createWriterInTmp(maxKeyCount);
+              writer = createWriterInTmp(maxKeyCount, compression, true);
             }
             // output to writer:
             for (KeyValue kv : kvs) {
@@ -1724,7 +1748,7 @@ public class Store extends SchemaConfigu
     return storeSize;
   }
 
-  void triggerMajorCompaction() {
+  public void triggerMajorCompaction() {
     this.forceMajor = true;
   }
 
@@ -1929,6 +1953,13 @@ public class Store extends SchemaConfigu
     return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
   }
 
+  /**
+   * Used for tests. Get the cache configuration for this Store.
+   */
+  public CacheConfig getCacheConfig() {
+    return this.cacheConf;
+  }
+
   public static final long FIXED_OVERHEAD = ClassSize.align(
       new SchemaConfigured().heapSize() + (15 * ClassSize.REFERENCE) +
       (7 * Bytes.SIZEOF_LONG) + (6 * Bytes.SIZEOF_INT) + (2 * 
Bytes.SIZEOF_BOOLEAN));

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java
 Thu Feb  2 22:54:18 2012
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileBlock;
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware;
 import org.apache.hadoop.hbase.util.ClassSize;
 
@@ -123,7 +122,7 @@ public class SchemaConfigured implements
    * current table and column family name, and the associated collection of
    * metrics.
    */
-  public void configureWithSchema(SchemaConfigured block) {
+  public void passSchemaMetricsTo(SchemaConfigured block) {
     SchemaConfigured upcast = block;  // need this to assign private fields
     upcast.tableName = tableName;
     upcast.cfName = cfName;

Modified: 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
 (original)
+++ 
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java
 Thu Feb  2 22:54:18 2012
@@ -208,9 +208,13 @@ public class SchemaMetrics {
       "hbase.metrics.showTableName";
 
   // Global variables
-  /** All instances of this class */
+  /**
+   * Maps a string key consisting of table name and column family name, with
+   * table name optionally replaced with {@link #TOTAL_KEY} if per-table
+   * metrics are disabled, to an instance of this class.
+   */
   private static final ConcurrentHashMap<String, SchemaMetrics>
-      cfToMetrics = new ConcurrentHashMap<String, SchemaMetrics>();
+      tableAndFamilyToMetrics = new ConcurrentHashMap<String, SchemaMetrics>();
 
   /** Metrics for all tables and column families. */
   // This has to be initialized after cfToMetrics.
@@ -306,14 +310,14 @@ public class SchemaMetrics {
     tableName = getEffectiveTableName(tableName);
 
     final String instanceKey = tableName + "\t" + cfName;
-    SchemaMetrics schemaMetrics = cfToMetrics.get(instanceKey);
+    SchemaMetrics schemaMetrics = tableAndFamilyToMetrics.get(instanceKey);
     if (schemaMetrics != null) {
       return schemaMetrics;
     }
 
     schemaMetrics = new SchemaMetrics(tableName, cfName);
-    SchemaMetrics existingMetrics = cfToMetrics.putIfAbsent(instanceKey,
-        schemaMetrics);
+    SchemaMetrics existingMetrics =
+        tableAndFamilyToMetrics.putIfAbsent(instanceKey, schemaMetrics);
     return existingMetrics != null ? existingMetrics : schemaMetrics;
   }
 
@@ -689,7 +693,7 @@ public class SchemaMetrics {
 
   public static Map<String, Long> getMetricsSnapshot() {
     Map<String, Long> metricsSnapshot = new TreeMap<String, Long>();
-    for (SchemaMetrics cfm : cfToMetrics.values()) {
+    for (SchemaMetrics cfm : tableAndFamilyToMetrics.values()) {
       for (String metricName : cfm.getAllMetricNames()) {
         long metricValue;
         if (isTimeVaryingKey(metricName)) {
@@ -746,7 +750,7 @@ public class SchemaMetrics {
     final Set<String> allKeys = new TreeSet<String>(oldMetrics.keySet());
     allKeys.addAll(newMetrics.keySet());
 
-    for (SchemaMetrics cfm : cfToMetrics.values()) {
+    for (SchemaMetrics cfm : tableAndFamilyToMetrics.values()) {
       for (String metricName : cfm.getAllMetricNames()) {
         if (metricName.startsWith(CF_PREFIX + CF_PREFIX)) {
           throw new AssertionError("Column family prefix used twice: " +
@@ -841,4 +845,16 @@ public class SchemaMetrics {
     useTableNameGlobally = useTableNameNew;
   }
 
+  /** Formats the given map of metrics in a human-readable way. */
+  public static String formatMetrics(Map<String, Long> metrics) {
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<String, Long> entry : metrics.entrySet()) {
+      if (sb.length() > 0) {
+        sb.append('\n');
+      }
+      sb.append(entry.getKey() + " : " + entry.getValue());
+    }
+    return sb.toString();
+  }
+
 }

Modified: 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
 (original)
+++ 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
 Thu Feb  2 22:54:18 2012
@@ -55,12 +55,15 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
@@ -759,6 +762,30 @@ public class HBaseTestingUtility {
   }
 
   /**
+   * Tool to get the reference to the region server object that holds the
+   * region of the specified user table.
+   * It first searches for the meta rows that contain the region of the
+   * specified table, then gets the index of that RS, and finally retrieves
+   * the RS's reference.
+   * @param tableName user table to lookup in .META.
+   * @return region server that holds it, null if the row doesn't exist
+   * @throws IOException
+   */
+  public HRegionServer getRSForFirstRegionInTable(byte[] tableName)
+      throws IOException {
+    List<byte[]> metaRows = getMetaTableRows(tableName);
+    if (metaRows == null || metaRows.isEmpty()) {
+      return null;
+    }
+    LOG.debug("Found " + metaRows.size() + " rows for table " +
+      Bytes.toString(tableName));
+    byte [] firstrow = metaRows.get(0);
+    LOG.debug("FirstRow=" + Bytes.toString(firstrow));
+    int index = hbaseCluster.getServerWith(firstrow);
+    return hbaseCluster.getRegionServerThreads().get(index).getRegionServer();
+  }
+
+  /**
    * Starts a <code>MiniMRCluster</code> with a default number of
    * <code>TaskTracker</code>'s.
    *
@@ -1263,4 +1290,25 @@ public class HBaseTestingUtility {
     return port;
   }
 
+  public HRegion createTestRegion(String tableName, String cfName,
+      Compression.Algorithm comprAlgo, BloomType bloomType, int maxVersions,
+      int blockSize) throws IOException {
+    HColumnDescriptor hcd =
+      new HColumnDescriptor(Bytes.toBytes(cfName), maxVersions,
+          comprAlgo.getName(),
+          HColumnDescriptor.DEFAULT_IN_MEMORY,
+          HColumnDescriptor.DEFAULT_BLOCKCACHE,
+          HColumnDescriptor.DEFAULT_TTL,
+          bloomType.toString());
+    hcd.setBlocksize(HFile.DEFAULT_BLOCKSIZE);
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(hcd);
+    HRegionInfo info =
+        new HRegionInfo(htd, null, null, false);
+    HRegion region =
+        HRegion.createHRegion(info, getTestDir("test_region_" +
+            tableName), getConfiguration());
+    return region;
+  }
+
 }

Modified: 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 (original)
+++ 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 Thu Feb  2 22:54:18 2012
@@ -69,8 +69,12 @@ import org.apache.hadoop.hbase.filter.Ro
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
@@ -4118,6 +4122,103 @@ public class TestFromClientSide {
   }
 
   @Test
+  public void testCacheOnWriteEvictOnClose() throws Exception {
+    byte [] tableName = Bytes.toBytes("testCOWEOCfromClient");
+    byte [] data = Bytes.toBytes("data");
+    HTable table = TEST_UTIL.createTable(tableName, new byte [][] {FAMILY});
+    // get the block cache and region
+    String regionName =
+        table.getRegionsInfo().firstKey().getRegionNameAsString();
+    HRegion region = TEST_UTIL.getRSForFirstRegionInTable(
+        tableName).getOnlineRegionByFullName(regionName);
+    Store store = region.getStores().values().iterator().next();
+    CacheConfig cacheConf = store.getCacheConfig();
+    cacheConf.setCacheDataOnWrite(true);
+    cacheConf.setEvictOnClose(true);
+    BlockCache cache = cacheConf.getBlockCache();
+
+    // establish baseline stats
+    long startBlockCount = cache.getBlockCount();
+    long startBlockHits = cache.getStats().getHitCount();
+    long startBlockMiss = cache.getStats().getMissCount();
+    // insert data
+    Put put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER, data);
+    table.put(put);
+    assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
+    // data was in memstore so don't expect any changes
+    assertEquals(startBlockCount, cache.getBlockCount());
+    assertEquals(startBlockHits, cache.getStats().getHitCount());
+    assertEquals(startBlockMiss, cache.getStats().getMissCount());
+    // flush the data
+    System.out.println("Flushing cache");
+    region.flushcache();
+    // expect one more block in cache, no change in hits/misses
+    long expectedBlockCount = startBlockCount + 1;
+    long expectedBlockHits = startBlockHits;
+    long expectedBlockMiss = startBlockMiss;
+    assertEquals(expectedBlockCount, cache.getBlockCount());
+    assertEquals(expectedBlockHits, cache.getStats().getHitCount());
+    assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
+    // read the data and expect same blocks, one new hit, no misses
+    assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data));
+    assertEquals(expectedBlockCount, cache.getBlockCount());
+    assertEquals(++expectedBlockHits, cache.getStats().getHitCount());
+    assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
+    // insert a second column, read the row, no new blocks, one new hit
+    byte [] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER);
+    byte [] data2 = Bytes.add(data, data);
+    put = new Put(ROW);
+    put.add(FAMILY, QUALIFIER2, data2);
+    table.put(put);
+    Result r = table.get(new Get(ROW));
+    assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
+    assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
+    assertEquals(expectedBlockCount, cache.getBlockCount());
+    assertEquals(++expectedBlockHits, cache.getStats().getHitCount());
+    assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
+    // flush, one new block
+    System.out.println("Flushing cache");
+    region.flushcache();
+    assertEquals(++expectedBlockCount, cache.getBlockCount());
+    assertEquals(expectedBlockHits, cache.getStats().getHitCount());
+    assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
+    // compact, net minus two blocks, two hits, no misses
+    System.out.println("Compacting");
+    assertEquals(2, store.getNumberOfStoreFiles());
+    store.triggerMajorCompaction();
+    region.compactStores();
+    waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max
+    assertEquals(1, store.getNumberOfStoreFiles());
+    expectedBlockCount -= 2; // evicted two blocks, cached none
+    assertEquals(expectedBlockCount, cache.getBlockCount());
+    expectedBlockHits += 2;
+    assertEquals(expectedBlockMiss, cache.getStats().getMissCount());
+    assertEquals(expectedBlockHits, cache.getStats().getHitCount());
+    // read the row, this should be a cache miss because we don't cache data
+    // blocks on compaction
+    r = table.get(new Get(ROW));
+    assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data));
+    assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2));
+    expectedBlockCount += 1; // cached one data block
+    assertEquals(expectedBlockCount, cache.getBlockCount());
+    assertEquals(expectedBlockHits, cache.getStats().getHitCount());
+    assertEquals(++expectedBlockMiss, cache.getStats().getMissCount());
+  }
+
+  private void waitForStoreFileCount(Store store, int count, int timeout)
+  throws InterruptedException {
+    long start = System.currentTimeMillis();
+    while (start + timeout > System.currentTimeMillis() &&
+        store.getNumberOfStoreFiles() != count) {
+      Thread.sleep(100);
+    }
+    System.out.println("start=" + start + ", now=" +
+        System.currentTimeMillis() + ", cur=" + store.getNumberOfStoreFiles());
+    assertEquals(count, store.getNumberOfStoreFiles());
+  }
+
+  @Test
   public void testMajorCompactCFRegion() throws Exception {
     compactCFRegion(1);
   }

Modified: 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
URL: 
http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java?rev=1239905&r1=1239904&r2=1239905&view=diff
==============================================================================
--- 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
 (original)
+++ 
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
 Thu Feb  2 22:54:18 2012
@@ -20,11 +20,16 @@
 
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.commons.logging.Log;
@@ -33,25 +38,23 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
-import org.apache.hadoop.hbase.io.hfile.Compression;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileBlock;
-import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
-import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
+import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
-import static org.junit.Assert.*;
 
 /**
  * Tests {@link HFile} cache-on-write functionality for the following block
@@ -78,6 +81,7 @@ public class TestCacheOnWrite {
   private static final int NUM_KV = 25000;
   private static final int INDEX_BLOCK_SIZE = 512;
   private static final int BLOOM_BLOCK_SIZE = 4096;
+  private static final BloomType BLOOM_TYPE = StoreFile.BloomType.ROWCOL;
 
   private static enum CacheOnWriteType {
     DATA_BLOCKS(BlockType.DATA, CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY),
@@ -152,7 +156,7 @@ public class TestCacheOnWrite {
   }
 
   @Test
-  public void testCacheOnWrite() throws IOException {
+  public void testStoreFileCacheOnWrite() throws IOException {
     writeStoreFile();
     readStoreFile();
   }
@@ -202,7 +206,7 @@ public class TestCacheOnWrite {
         "test_cache_on_write");
     StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir,
         DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf,
-        cacheConf, StoreFile.BloomType.ROWCOL, NUM_KV);
+        cacheConf, BLOOM_TYPE, NUM_KV);
 
     final int rowLen = 32;
     for (int i = 0; i < NUM_KV; ++i) {
@@ -223,4 +227,54 @@ public class TestCacheOnWrite {
     storeFilePath = sfw.getPath();
   }
 
+  @Test
+  public void testNotCachingDataBlocksDuringCompaction() throws IOException {
+    // TODO: need to change this test if we add a cache size threshold for
+    // compactions, or if we implement some other kind of intelligent logic for
+    // deciding what blocks to cache-on-write on compaction.
+    final String table = "CompactionCacheOnWrite";
+    final String cf = "myCF";
+    final byte[] cfBytes = Bytes.toBytes(cf);
+    final int maxVersions = 3;
+    HRegion region = TEST_UTIL.createTestRegion(table, cf, compress,
+        BLOOM_TYPE, maxVersions, HFile.DEFAULT_BLOCKSIZE);
+    int rowIdx = 0;
+    long ts = EnvironmentEdgeManager.currentTimeMillis();
+    for (int iFile = 0; iFile < 5; ++iFile) {
+      for (int iRow = 0; iRow < 500; ++iRow) {
+        String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" +
+            iRow;
+        Put p = new Put(Bytes.toBytes(rowStr));
+        ++rowIdx;
+        for (int iCol = 0; iCol < 10; ++iCol) {
+          String qualStr = "col" + iCol;
+          String valueStr = "value_" + rowStr + "_" + qualStr;
+          for (int iTS = 0; iTS < 5; ++iTS) {
+            p.add(cfBytes, Bytes.toBytes(qualStr), ts++,
+                Bytes.toBytes(valueStr));
+          }
+        }
+        region.put(p);
+      }
+      region.flushcache();
+    }
+    LruBlockCache blockCache =
+        (LruBlockCache) new CacheConfig(conf).getBlockCache();
+    blockCache.clearCache();
+    assertEquals(0, blockCache.getBlockTypeCountsForTest().size());
+    Map<String, Long> metricsBefore = SchemaMetrics.getMetricsSnapshot();
+    region.compactStores();
+    LOG.debug("compactStores() returned");
+    SchemaMetrics.validateMetricChanges(metricsBefore);
+    Map<String, Long> compactionMetrics = SchemaMetrics.diffMetrics(
+        metricsBefore, SchemaMetrics.getMetricsSnapshot());
+    LOG.debug(SchemaMetrics.formatMetrics(compactionMetrics));
+    Map<BlockType, Integer> blockTypesInCache =
+        blockCache.getBlockTypeCountsForTest();
+    LOG.debug("Block types in cache: " + blockTypesInCache);
+    assertNull(blockTypesInCache.get(BlockType.DATA));
+    region.close();
+    blockCache.shutdown();
+  }
+
 }


Reply via email to