HBASE-14636 Clear HFileScannerImpl#prevBlocks in between Compaction flow.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c9523a56 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c9523a56 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c9523a56 Branch: refs/heads/hbase-12439 Commit: c9523a569d45e9edc2c2d7b8d4d9cbf05f46a100 Parents: 51693b9 Author: anoopsjohn <anoopsamj...@gmail.com> Authored: Tue Oct 20 13:06:09 2015 +0530 Committer: anoopsjohn <anoopsamj...@gmail.com> Committed: Tue Oct 20 13:06:09 2015 +0530 ---------------------------------------------------------------------- .../hadoop/hbase/io/hfile/HFileBlock.java | 7 ++++ .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 6 ++- .../regionserver/compactions/Compactor.java | 43 ++++++++++++++------ 3 files changed, 42 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c9523a56/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 4fd32a4..a68d0a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -1938,6 +1938,13 @@ public class HFileBlock implements Cacheable { } /** + * @return true if this block is backed by a shared memory area(such as that of a BucketCache). + */ + public boolean usesSharedMemory() { + return this.memType == MemoryType.SHARED; + } + + /** * Convert the contents of the block header into a human readable string. * This is mostly helpful for debugging. This assumes that the block * has minor version > 0. http://git-wip-us.apache.org/repos/asf/hbase/blob/c9523a56/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 6970d27..5af72b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -510,14 +510,16 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { block.getOffset() == this.curBlock.getOffset()) { return; } - if (this.curBlock != null) { + // We don't have to keep ref to EXCLUSIVE type of block + if (this.curBlock != null && this.curBlock.usesSharedMemory()) { prevBlocks.add(this.curBlock); } this.curBlock = block; } void reset() { - if (this.curBlock != null) { + // We don't have to keep ref to EXCLUSIVE type of block + if (this.curBlock != null && this.curBlock.usesSharedMemory()) { this.prevBlocks.add(this.curBlock); } this.curBlock = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/c9523a56/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 873d827..eaccd0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; @@ -58,13 +59,14 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @InterfaceAudience.Private public abstract class Compactor { private static final Log LOG = LogFactory.getLog(Compactor.class); + private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; protected CompactionProgress progress; protected Configuration conf; protected Store store; protected int compactionKVMax; protected Compression.Algorithm compactionCompression; - + /** specify how many days to keep MVCC values during major compaction **/ protected int keepSeqIdPeriod; @@ -272,12 +274,13 @@ public abstract class Compactor { protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, CompactionThroughputController throughputController, boolean major) throws IOException { - long bytesWritten = 0; - long bytesWrittenProgress = 0; + long bytesWrittenProgressForCloseCheck = 0; + long bytesWrittenProgressForLog = 0; + long bytesWrittenProgressForShippedCall = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. List<Cell> cells = new ArrayList<Cell>(); - long closeCheckInterval = HStore.getCloseCheckInterval(); + long closeCheckSizeLimit = HStore.getCloseCheckInterval(); long lastMillis = 0; if (LOG.isDebugEnabled()) { lastMillis = EnvironmentEdgeManager.currentTime(); @@ -289,6 +292,11 @@ public abstract class Compactor { ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); throughputController.start(compactionName); + KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null; + int minFilesToCompact = Math.max(2, + conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, + /* old name */ conf.getInt("hbase.hstore.compactionThreshold", 3))); + long shippedCallSizeLimit = minFilesToCompact * HConstants.DEFAULT_BLOCKSIZE; try { do { hasMore = scanner.next(cells, scannerContext); @@ -304,35 +312,46 @@ public abstract class Compactor { int len = KeyValueUtil.length(c); ++progress.currentCompactedKVs; progress.totalCompactedSize += len; + bytesWrittenProgressForShippedCall += len; if (LOG.isDebugEnabled()) { - bytesWrittenProgress += len; + bytesWrittenProgressForLog += len; } throughputController.control(compactionName, len); // check periodically to see if a system stop is requested - if (closeCheckInterval > 0) { - bytesWritten += len; - if (bytesWritten > closeCheckInterval) { - bytesWritten = 0; + if (closeCheckSizeLimit > 0) { + bytesWrittenProgressForCloseCheck += len; + if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { + bytesWrittenProgressForCloseCheck = 0; if (!store.areWritesEnabled()) { progress.cancel(); return false; } } } + if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { + // The SHARED block references, being read for compaction, will be kept in prevBlocks + // list(See HFileScannerImpl#prevBlocks). In case of scan flow, after each set of cells + // being returned to client, we will call shipped() which can clear this list. Here by + // we are doing the similar thing. In between the compaction (after every N cells + // written with collective size of 'shippedCallSizeLimit') we will call shipped which + // may clear prevBlocks list. + kvs.shipped(); + bytesWrittenProgressForShippedCall = 0; + } } // Log the progress of long running compactions every minute if // logging at DEBUG level if (LOG.isDebugEnabled()) { - if ((now - lastMillis) >= 60 * 1000) { + if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { LOG.debug("Compaction progress: " + compactionName + " " + progress - + String.format(", rate=%.2f kB/sec", (bytesWrittenProgress / 1024.0) + + String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)) + ", throughputController is " + throughputController); lastMillis = now; - bytesWrittenProgress = 0; + bytesWrittenProgressForLog = 0; } } cells.clear();