Author: mbautin Date: Thu Feb 2 22:54:18 2012 New Revision: 1239905 URL: http://svn.apache.org/viewvc?rev=1239905&view=rev Log: [jira] [HBASE-5230] [89-fb] Ensure we do not cache data blocks on compaction
Summary: This is a port of https://reviews.facebook.net/D1353 to 89-fb adding a unit test that checks that we do not cache-on-write data blocks on compaction, and a fix to make the unit test pass. This also includes a few pieces of code from trunk, e.g. TestFromClientSide.testCacheOnWriteEvictOnClose and some methods that it relies on, that previously were not there in 89-fb. Submitting as an internal diff for now since there are some problems with the apache_sync.py tool that syncs patches to hbase-89-fb-apache. Test Plan: Unit tests, dev cluster Reviewers: kannan, liyintang, kranganathan, nspiegelberg Reviewed By: kannan CC: hbase-eng@lists Differential Revision: https://phabricator.fb.com/D400208 Revert Plan: OK Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Feb 2 22:54:18 2012 @@ -44,6 +44,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -380,8 +381,9 @@ public class HTable implements HTableInt * @return A map of HRegionInfo with it's server address * @throws IOException if a remote or network exception occurs */ - public Map<HRegionInfo, HServerAddress> getRegionsInfo() throws IOException { - final Map<HRegionInfo, HServerAddress> regionMap = + public NavigableMap<HRegionInfo, HServerAddress> getRegionsInfo() + throws IOException { + final NavigableMap<HRegionInfo, HServerAddress> regionMap = new TreeMap<HRegionInfo, HServerAddress>(); MetaScannerVisitor visitor = new MetaScannerVisitor() { Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV1.java Thu Feb 2 22:54:18 2012 @@ -241,7 +241,7 @@ public class HFileReaderV1 extends Abstr HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset - offset, metaBlockIndexReader.getRootBlockDataSize(block), true); - configureWithSchema(hfileBlock); + passSchemaMetricsTo(hfileBlock); hfileBlock.expectType(BlockType.META); long delta = System.currentTimeMillis() - now; @@ -316,7 +316,7 @@ public class HFileReaderV1 extends Abstr HFileBlock hfileBlock = fsBlockReader.readBlockData(offset, nextOffset - offset, dataBlockIndexReader.getRootBlockDataSize(block), pread); - configureWithSchema(hfileBlock); + passSchemaMetricsTo(hfileBlock); hfileBlock.expectType(BlockType.DATA); ByteBuffer buf = hfileBlock.getBufferWithoutHeader(); Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderV2.java Thu Feb 2 22:54:18 2012 @@ -196,7 +196,7 @@ public class HFileReaderV2 extends Abstr HFileBlock metaBlock = fsBlockReader.readBlockData(metaBlockOffset, blockSize, -1, true); - configureWithSchema(metaBlock); + passSchemaMetricsTo(metaBlock); long delta = System.currentTimeMillis() - now; HFile.readTime += delta; @@ -246,7 +246,6 @@ public class HFileReaderV2 extends Abstr IdLock.Entry lockEntry = offsetLock.getLockEntry(dataBlockOffset); try { // Check cache for block. If found return. - cacheBlock &= cacheConf.shouldCacheDataOnRead(); if (cacheConf.isBlockCacheEnabled()) { HFileBlock cachedBlock = (HFileBlock) cacheConf.getBlockCache().getBlock(cacheKey, cacheBlock); @@ -261,23 +260,24 @@ public class HFileReaderV2 extends Abstr // Load block from filesystem. long now = System.currentTimeMillis(); - HFileBlock dataBlock = fsBlockReader.readBlockData(dataBlockOffset, + HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, -1, pread); - configureWithSchema(dataBlock); - BlockCategory blockCategory = dataBlock.getBlockType().getCategory(); + passSchemaMetricsTo(hfileBlock); + BlockCategory blockCategory = hfileBlock.getBlockType().getCategory(); long delta = System.currentTimeMillis() - now; HFile.readTime += delta; HFile.readOps++; getSchemaMetrics().updateOnCacheMiss(blockCategory, isCompaction, delta); - // Cache the block - if (cacheBlock) { - cacheConf.getBlockCache().cacheBlock(cacheKey, dataBlock, + // Cache the block if necessary + if (cacheBlock && cacheConf.shouldCacheBlockOnRead( + hfileBlock.getBlockType().getCategory())) { + cacheConf.getBlockCache().cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory()); } - return dataBlock; + return hfileBlock; } finally { offsetLock.releaseLockEntry(lockEntry); } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java Thu Feb 2 22:54:18 2012 @@ -227,7 +227,7 @@ public class HFileWriterV1 extends Abstr HFileBlock cBlock = new HFileBlock(BlockType.DATA, (int) (outputStream.getPos() - blockBegin), bytes.length, -1, ByteBuffer.wrap(bytes, 0, bytes.length), true, blockBegin); - configureWithSchema(cBlock); + passSchemaMetricsTo(cBlock); cacheConf.getBlockCache().cacheBlock( HFile.getBlockCacheKey(name, blockBegin),cBlock); baosDos.close(); Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java Thu Feb 2 22:54:18 2012 @@ -257,7 +257,7 @@ public class HFileWriterV2 extends Abstr HFile.writeOps++; if (cacheConf.shouldCacheDataOnWrite()) { - cacheBlockOnWrite(lastDataBlockOffset); + doCacheOnWrite(lastDataBlockOffset); } } @@ -274,17 +274,23 @@ public class HFileWriterV2 extends Abstr fsBlockWriter.getUncompressedSizeWithoutHeader()); if (cacheThisBlock) { - cacheBlockOnWrite(offset); + doCacheOnWrite(offset); } } } } - private void cacheBlockOnWrite(long offset) { - final HFileBlock cBlock = fsBlockWriter.getBlockForCaching(); - configureWithSchema(cBlock); - cacheConf.getBlockCache().cacheBlock(HFile.getBlockCacheKey(name, offset), - cBlock); + /** + * Caches the last written HFile block. + * @param offset the offset of the block we want to cache. Used to determine + * the cache key. + */ + private void doCacheOnWrite(long offset) { + // Cache this block on write. + HFileBlock cacheFormatBlock = fsBlockWriter.getBlockForCaching(); + passSchemaMetricsTo(cacheFormatBlock); + cacheConf.getBlockCache().cacheBlock( + HFile.getBlockCacheKey(name, offset), cacheFormatBlock); } /** Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java Thu Feb 2 22:54:18 2012 @@ -20,15 +20,17 @@ package org.apache.hadoop.hbase.io.hfile; import java.lang.ref.WeakReference; +import java.util.EnumMap; +import java.util.Map; import java.util.PriorityQueue; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -845,4 +847,14 @@ public class LruBlockCache implements Bl return fileNames; } + Map<BlockType, Integer> getBlockTypeCountsForTest() { + Map<BlockType, Integer> counts = + new EnumMap<BlockType, Integer>(BlockType.class); + for (CachedBlock cb : map.values()) { + BlockType blockType = ((HFileBlock) cb.getBuffer()).getBlockType(); + Integer count = counts.get(blockType); + counts.put(blockType, (count == null ? 0 : count) + 1); + } + return counts; + } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Feb 2 22:54:18 2012 @@ -2604,6 +2604,11 @@ public class HRegionServer implements HR return sortedRegions; } + public HRegion getOnlineRegionByFullName(final String regionName) { + return this.onlineRegions.get(Bytes.mapKey(Bytes.toBytes( + regionName))); + } + /** * @param regionName * @return HRegion for the passed <code>regionName</code> or null if named Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Thu Feb 2 22:54:18 2012 @@ -658,10 +658,34 @@ public class Store extends SchemaConfigu */ private StoreFile.Writer createWriterInTmp(long maxKeyCount) throws IOException { - return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize, - this.compression, this.comparator, this.conf, this.cacheConf, - this.family.getBloomFilterType(), this.family.getBloomFilterErrorRate(), - maxKeyCount, region.getFavoredNodes()); + return createWriterInTmp(maxKeyCount, this.compression, false); + } + + /* + * @param maxKeyCount + * @param compression Compression algorithm to use + * @param isCompaction whether we are creating a new file in a compaction + * @return Writer for a new StoreFile in the tmp dir. + */ + private StoreFile.Writer createWriterInTmp(long maxKeyCount, + Compression.Algorithm compression, boolean isCompaction) + throws IOException { + final CacheConfig writerCacheConf; + if (isCompaction) { + // Don't cache data on write on compactions. + writerCacheConf = new CacheConfig(cacheConf); + writerCacheConf.setCacheDataOnWrite(false); + } else { + writerCacheConf = cacheConf; + } + StoreFile.Writer w = StoreFile.createWriter(fs, region.getTmpDir(), + blocksize, compression, comparator, conf, writerCacheConf, + family.getBloomFilterType(), maxKeyCount); + // The store file writer's path does not include the CF name, so we need + // to configure the HFile writer directly. + SchemaConfigured sc = (SchemaConfigured) w.writer; + passSchemaMetricsTo(sc); + return w; } /* @@ -1295,7 +1319,7 @@ public class Store extends SchemaConfigu hasMore = scanner.next(kvs, 1); if (!kvs.isEmpty()) { if (writer == null) { - writer = createWriterInTmp(maxKeyCount); + writer = createWriterInTmp(maxKeyCount, compression, true); } // output to writer: for (KeyValue kv : kvs) { @@ -1724,7 +1748,7 @@ public class Store extends SchemaConfigu return storeSize; } - void triggerMajorCompaction() { + public void triggerMajorCompaction() { this.forceMajor = true; } @@ -1929,6 +1953,13 @@ public class Store extends SchemaConfigu return (storefiles.size() - filesCompacting.size()) > minFilesToCompact; } + /** + * Used for tests. Get the cache configuration for this Store. + */ + public CacheConfig getCacheConfig() { + return this.cacheConf; + } + public static final long FIXED_OVERHEAD = ClassSize.align( new SchemaConfigured().heapSize() + (15 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG) + (6 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaConfigured.java Thu Feb 2 22:54:18 2012 @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware; import org.apache.hadoop.hbase.util.ClassSize; @@ -123,7 +122,7 @@ public class SchemaConfigured implements * current table and column family name, and the associated collection of * metrics. */ - public void configureWithSchema(SchemaConfigured block) { + public void passSchemaMetricsTo(SchemaConfigured block) { SchemaConfigured upcast = block; // need this to assign private fields upcast.tableName = tableName; upcast.cfName = cfName; Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/SchemaMetrics.java Thu Feb 2 22:54:18 2012 @@ -208,9 +208,13 @@ public class SchemaMetrics { "hbase.metrics.showTableName"; // Global variables - /** All instances of this class */ + /** + * Maps a string key consisting of table name and column family name, with + * table name optionally replaced with {@link #TOTAL_KEY} if per-table + * metrics are disabled, to an instance of this class. + */ private static final ConcurrentHashMap<String, SchemaMetrics> - cfToMetrics = new ConcurrentHashMap<String, SchemaMetrics>(); + tableAndFamilyToMetrics = new ConcurrentHashMap<String, SchemaMetrics>(); /** Metrics for all tables and column families. */ // This has to be initialized after cfToMetrics. @@ -306,14 +310,14 @@ public class SchemaMetrics { tableName = getEffectiveTableName(tableName); final String instanceKey = tableName + "\t" + cfName; - SchemaMetrics schemaMetrics = cfToMetrics.get(instanceKey); + SchemaMetrics schemaMetrics = tableAndFamilyToMetrics.get(instanceKey); if (schemaMetrics != null) { return schemaMetrics; } schemaMetrics = new SchemaMetrics(tableName, cfName); - SchemaMetrics existingMetrics = cfToMetrics.putIfAbsent(instanceKey, - schemaMetrics); + SchemaMetrics existingMetrics = + tableAndFamilyToMetrics.putIfAbsent(instanceKey, schemaMetrics); return existingMetrics != null ? existingMetrics : schemaMetrics; } @@ -689,7 +693,7 @@ public class SchemaMetrics { public static Map<String, Long> getMetricsSnapshot() { Map<String, Long> metricsSnapshot = new TreeMap<String, Long>(); - for (SchemaMetrics cfm : cfToMetrics.values()) { + for (SchemaMetrics cfm : tableAndFamilyToMetrics.values()) { for (String metricName : cfm.getAllMetricNames()) { long metricValue; if (isTimeVaryingKey(metricName)) { @@ -746,7 +750,7 @@ public class SchemaMetrics { final Set<String> allKeys = new TreeSet<String>(oldMetrics.keySet()); allKeys.addAll(newMetrics.keySet()); - for (SchemaMetrics cfm : cfToMetrics.values()) { + for (SchemaMetrics cfm : tableAndFamilyToMetrics.values()) { for (String metricName : cfm.getAllMetricNames()) { if (metricName.startsWith(CF_PREFIX + CF_PREFIX)) { throw new AssertionError("Column family prefix used twice: " + @@ -841,4 +845,16 @@ public class SchemaMetrics { useTableNameGlobally = useTableNameNew; } + /** Formats the given map of metrics in a human-readable way. */ + public static String formatMetrics(Map<String, Long> metrics) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<String, Long> entry : metrics.entrySet()) { + if (sb.length() > 0) { + sb.append('\n'); + } + sb.append(entry.getKey() + " : " + entry.getValue()); + } + return sb.toString(); + } + } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Thu Feb 2 22:54:18 2012 @@ -55,12 +55,15 @@ import org.apache.hadoop.hbase.client.Re import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.Compression; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; @@ -759,6 +762,30 @@ public class HBaseTestingUtility { } /** + * Tool to get the reference to the region server object that holds the + * region of the specified user table. + * It first searches for the meta rows that contain the region of the + * specified table, then gets the index of that RS, and finally retrieves + * the RS's reference. + * @param tableName user table to lookup in .META. + * @return region server that holds it, null if the row doesn't exist + * @throws IOException + */ + public HRegionServer getRSForFirstRegionInTable(byte[] tableName) + throws IOException { + List<byte[]> metaRows = getMetaTableRows(tableName); + if (metaRows == null || metaRows.isEmpty()) { + return null; + } + LOG.debug("Found " + metaRows.size() + " rows for table " + + Bytes.toString(tableName)); + byte [] firstrow = metaRows.get(0); + LOG.debug("FirstRow=" + Bytes.toString(firstrow)); + int index = hbaseCluster.getServerWith(firstrow); + return hbaseCluster.getRegionServerThreads().get(index).getRegionServer(); + } + + /** * Starts a <code>MiniMRCluster</code> with a default number of * <code>TaskTracker</code>'s. * @@ -1263,4 +1290,25 @@ public class HBaseTestingUtility { return port; } + public HRegion createTestRegion(String tableName, String cfName, + Compression.Algorithm comprAlgo, BloomType bloomType, int maxVersions, + int blockSize) throws IOException { + HColumnDescriptor hcd = + new HColumnDescriptor(Bytes.toBytes(cfName), maxVersions, + comprAlgo.getName(), + HColumnDescriptor.DEFAULT_IN_MEMORY, + HColumnDescriptor.DEFAULT_BLOCKCACHE, + HColumnDescriptor.DEFAULT_TTL, + bloomType.toString()); + hcd.setBlocksize(HFile.DEFAULT_BLOCKSIZE); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(hcd); + HRegionInfo info = + new HRegionInfo(htd, null, null, false); + HRegion region = + HRegion.createHRegion(info, getTestDir("test_region_" + + tableName), getConfiguration()); + return region; + } + } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Thu Feb 2 22:54:18 2012 @@ -69,8 +69,12 @@ import org.apache.hadoop.hbase.filter.Ro import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.io.hfile.BlockCache; +import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; @@ -4118,6 +4122,103 @@ public class TestFromClientSide { } @Test + public void testCacheOnWriteEvictOnClose() throws Exception { + byte [] tableName = Bytes.toBytes("testCOWEOCfromClient"); + byte [] data = Bytes.toBytes("data"); + HTable table = TEST_UTIL.createTable(tableName, new byte [][] {FAMILY}); + // get the block cache and region + String regionName = + table.getRegionsInfo().firstKey().getRegionNameAsString(); + HRegion region = TEST_UTIL.getRSForFirstRegionInTable( + tableName).getOnlineRegionByFullName(regionName); + Store store = region.getStores().values().iterator().next(); + CacheConfig cacheConf = store.getCacheConfig(); + cacheConf.setCacheDataOnWrite(true); + cacheConf.setEvictOnClose(true); + BlockCache cache = cacheConf.getBlockCache(); + + // establish baseline stats + long startBlockCount = cache.getBlockCount(); + long startBlockHits = cache.getStats().getHitCount(); + long startBlockMiss = cache.getStats().getMissCount(); + // insert data + Put put = new Put(ROW); + put.add(FAMILY, QUALIFIER, data); + table.put(put); + assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); + // data was in memstore so don't expect any changes + assertEquals(startBlockCount, cache.getBlockCount()); + assertEquals(startBlockHits, cache.getStats().getHitCount()); + assertEquals(startBlockMiss, cache.getStats().getMissCount()); + // flush the data + System.out.println("Flushing cache"); + region.flushcache(); + // expect one more block in cache, no change in hits/misses + long expectedBlockCount = startBlockCount + 1; + long expectedBlockHits = startBlockHits; + long expectedBlockMiss = startBlockMiss; + assertEquals(expectedBlockCount, cache.getBlockCount()); + assertEquals(expectedBlockHits, cache.getStats().getHitCount()); + assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); + // read the data and expect same blocks, one new hit, no misses + assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); + assertEquals(expectedBlockCount, cache.getBlockCount()); + assertEquals(++expectedBlockHits, cache.getStats().getHitCount()); + assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); + // insert a second column, read the row, no new blocks, one new hit + byte [] QUALIFIER2 = Bytes.add(QUALIFIER, QUALIFIER); + byte [] data2 = Bytes.add(data, data); + put = new Put(ROW); + put.add(FAMILY, QUALIFIER2, data2); + table.put(put); + Result r = table.get(new Get(ROW)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + assertEquals(expectedBlockCount, cache.getBlockCount()); + assertEquals(++expectedBlockHits, cache.getStats().getHitCount()); + assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); + // flush, one new block + System.out.println("Flushing cache"); + region.flushcache(); + assertEquals(++expectedBlockCount, cache.getBlockCount()); + assertEquals(expectedBlockHits, cache.getStats().getHitCount()); + assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); + // compact, net minus two blocks, two hits, no misses + System.out.println("Compacting"); + assertEquals(2, store.getNumberOfStoreFiles()); + store.triggerMajorCompaction(); + region.compactStores(); + waitForStoreFileCount(store, 1, 10000); // wait 10 seconds max + assertEquals(1, store.getNumberOfStoreFiles()); + expectedBlockCount -= 2; // evicted two blocks, cached none + assertEquals(expectedBlockCount, cache.getBlockCount()); + expectedBlockHits += 2; + assertEquals(expectedBlockMiss, cache.getStats().getMissCount()); + assertEquals(expectedBlockHits, cache.getStats().getHitCount()); + // read the row, this should be a cache miss because we don't cache data + // blocks on compaction + r = table.get(new Get(ROW)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER), data)); + assertTrue(Bytes.equals(r.getValue(FAMILY, QUALIFIER2), data2)); + expectedBlockCount += 1; // cached one data block + assertEquals(expectedBlockCount, cache.getBlockCount()); + assertEquals(expectedBlockHits, cache.getStats().getHitCount()); + assertEquals(++expectedBlockMiss, cache.getStats().getMissCount()); + } + + private void waitForStoreFileCount(Store store, int count, int timeout) + throws InterruptedException { + long start = System.currentTimeMillis(); + while (start + timeout > System.currentTimeMillis() && + store.getNumberOfStoreFiles() != count) { + Thread.sleep(100); + } + System.out.println("start=" + start + ", now=" + + System.currentTimeMillis() + ", cur=" + store.getNumberOfStoreFiles()); + assertEquals(count, store.getNumberOfStoreFiles()); + } + + @Test public void testMajorCompactCFRegion() throws Exception { compactCFRegion(1); } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java?rev=1239905&r1=1239904&r2=1239905&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java Thu Feb 2 22:54:18 2012 @@ -20,11 +20,16 @@ package org.apache.hadoop.hbase.io.hfile; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumMap; import java.util.List; +import java.util.Map; import java.util.Random; import org.apache.commons.logging.Log; @@ -33,25 +38,23 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.Compression; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileBlock; -import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex; -import org.apache.hadoop.hbase.io.hfile.HFileReaderV2; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.regionserver.CreateRandomStoreFile; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; +import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.BloomFilterFactory; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import static org.junit.Assert.*; /** * Tests {@link HFile} cache-on-write functionality for the following block @@ -78,6 +81,7 @@ public class TestCacheOnWrite { private static final int NUM_KV = 25000; private static final int INDEX_BLOCK_SIZE = 512; private static final int BLOOM_BLOCK_SIZE = 4096; + private static final BloomType BLOOM_TYPE = StoreFile.BloomType.ROWCOL; private static enum CacheOnWriteType { DATA_BLOCKS(BlockType.DATA, CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY), @@ -152,7 +156,7 @@ public class TestCacheOnWrite { } @Test - public void testCacheOnWrite() throws IOException { + public void testStoreFileCacheOnWrite() throws IOException { writeStoreFile(); readStoreFile(); } @@ -202,7 +206,7 @@ public class TestCacheOnWrite { "test_cache_on_write"); StoreFile.Writer sfw = StoreFile.createWriter(fs, storeFileParentDir, DATA_BLOCK_SIZE, compress, KeyValue.COMPARATOR, conf, - cacheConf, StoreFile.BloomType.ROWCOL, NUM_KV); + cacheConf, BLOOM_TYPE, NUM_KV); final int rowLen = 32; for (int i = 0; i < NUM_KV; ++i) { @@ -223,4 +227,54 @@ public class TestCacheOnWrite { storeFilePath = sfw.getPath(); } + @Test + public void testNotCachingDataBlocksDuringCompaction() throws IOException { + // TODO: need to change this test if we add a cache size threshold for + // compactions, or if we implement some other kind of intelligent logic for + // deciding what blocks to cache-on-write on compaction. + final String table = "CompactionCacheOnWrite"; + final String cf = "myCF"; + final byte[] cfBytes = Bytes.toBytes(cf); + final int maxVersions = 3; + HRegion region = TEST_UTIL.createTestRegion(table, cf, compress, + BLOOM_TYPE, maxVersions, HFile.DEFAULT_BLOCKSIZE); + int rowIdx = 0; + long ts = EnvironmentEdgeManager.currentTimeMillis(); + for (int iFile = 0; iFile < 5; ++iFile) { + for (int iRow = 0; iRow < 500; ++iRow) { + String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + + iRow; + Put p = new Put(Bytes.toBytes(rowStr)); + ++rowIdx; + for (int iCol = 0; iCol < 10; ++iCol) { + String qualStr = "col" + iCol; + String valueStr = "value_" + rowStr + "_" + qualStr; + for (int iTS = 0; iTS < 5; ++iTS) { + p.add(cfBytes, Bytes.toBytes(qualStr), ts++, + Bytes.toBytes(valueStr)); + } + } + region.put(p); + } + region.flushcache(); + } + LruBlockCache blockCache = + (LruBlockCache) new CacheConfig(conf).getBlockCache(); + blockCache.clearCache(); + assertEquals(0, blockCache.getBlockTypeCountsForTest().size()); + Map<String, Long> metricsBefore = SchemaMetrics.getMetricsSnapshot(); + region.compactStores(); + LOG.debug("compactStores() returned"); + SchemaMetrics.validateMetricChanges(metricsBefore); + Map<String, Long> compactionMetrics = SchemaMetrics.diffMetrics( + metricsBefore, SchemaMetrics.getMetricsSnapshot()); + LOG.debug(SchemaMetrics.formatMetrics(compactionMetrics)); + Map<BlockType, Integer> blockTypesInCache = + blockCache.getBlockTypeCountsForTest(); + LOG.debug("Block types in cache: " + blockTypesInCache); + assertNull(blockTypesInCache.get(BlockType.DATA)); + region.close(); + blockCache.shutdown(); + } + }
