http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index daad241..de41087 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.NavigableSet; import java.util.Optional; +import java.util.OptionalDouble; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; @@ -43,6 +44,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; +import java.util.function.ToLongFunction; +import java.util.stream.Collectors; +import java.util.stream.LongStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -171,7 +175,7 @@ public class HStore implements Store { private ScanInfo scanInfo; // TODO: ideally, this should be part of storeFileManager, as we keep passing this to it. - final List<StoreFile> filesCompacting = Lists.newArrayList(); + final List<HStoreFile> filesCompacting = Lists.newArrayList(); // All access must be synchronized. private final Set<ChangedReadersObserver> changedReaderObservers = @@ -335,7 +339,7 @@ public class HStore implements Store { * @param kvComparator KVComparator for storeFileManager. * @return StoreEngine to use. */ - protected StoreEngine<?, ?, ?, ?> createStoreEngine(Store store, Configuration conf, + protected StoreEngine<?, ?, ?, ?> createStoreEngine(HStore store, Configuration conf, CellComparator kvComparator) throws IOException { return StoreEngine.create(store, conf, comparator); } @@ -517,12 +521,12 @@ public class HStore implements Store { * from the given directory. * @throws IOException */ - private List<StoreFile> loadStoreFiles() throws IOException { + private List<HStoreFile> loadStoreFiles() throws IOException { Collection<StoreFileInfo> files = fs.getStoreFiles(getColumnFamilyName()); return openStoreFiles(files); } - private List<StoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException { + private List<HStoreFile> openStoreFiles(Collection<StoreFileInfo> files) throws IOException { if (files == null || files.isEmpty()) { return new ArrayList<>(); } @@ -530,28 +534,21 @@ public class HStore implements Store { ThreadPoolExecutor storeFileOpenerThreadPool = this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" + this.getColumnFamilyName()); - CompletionService<StoreFile> completionService = new ExecutorCompletionService<>(storeFileOpenerThreadPool); + CompletionService<HStoreFile> completionService = new ExecutorCompletionService<>(storeFileOpenerThreadPool); int totalValidStoreFile = 0; - for (final StoreFileInfo storeFileInfo: files) { + for (StoreFileInfo storeFileInfo : files) { // open each store file in parallel - completionService.submit(new Callable<StoreFile>() { - @Override - public StoreFile call() throws IOException { - StoreFile storeFile = createStoreFileAndReader(storeFileInfo); - return storeFile; - } - }); + completionService.submit(() -> this.createStoreFileAndReader(storeFileInfo)); totalValidStoreFile++; } - ArrayList<StoreFile> results = new ArrayList<>(files.size()); + ArrayList<HStoreFile> results = new ArrayList<>(files.size()); IOException ioe = null; try { for (int i = 0; i < totalValidStoreFile; i++) { try { - Future<StoreFile> future = completionService.take(); - StoreFile storeFile = future.get(); + HStoreFile storeFile = completionService.take().get(); if (storeFile != null) { long length = storeFile.getReader().length(); this.storeSize += length; @@ -574,9 +571,9 @@ public class HStore implements Store { // close StoreFile readers boolean evictOnClose = cacheConf != null? cacheConf.shouldEvictOnClose(): true; - for (StoreFile file : results) { + for (HStoreFile file : results) { try { - if (file != null) file.closeReader(evictOnClose); + if (file != null) file.closeStoreFile(evictOnClose); } catch (IOException e) { LOG.warn(e.getMessage()); } @@ -618,19 +615,18 @@ public class HStore implements Store { */ private void refreshStoreFilesInternal(Collection<StoreFileInfo> newFiles) throws IOException { StoreFileManager sfm = storeEngine.getStoreFileManager(); - Collection<StoreFile> currentFiles = sfm.getStorefiles(); - Collection<StoreFile> compactedFiles = sfm.getCompactedfiles(); + Collection<HStoreFile> currentFiles = sfm.getStorefiles(); + Collection<HStoreFile> compactedFiles = sfm.getCompactedfiles(); if (currentFiles == null) currentFiles = Collections.emptySet(); if (newFiles == null) newFiles = Collections.emptySet(); if (compactedFiles == null) compactedFiles = Collections.emptySet(); - HashMap<StoreFileInfo, StoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); - for (StoreFile sf : currentFiles) { + HashMap<StoreFileInfo, HStoreFile> currentFilesSet = new HashMap<>(currentFiles.size()); + for (HStoreFile sf : currentFiles) { currentFilesSet.put(sf.getFileInfo(), sf); } - HashMap<StoreFileInfo, StoreFile> compactedFilesSet = - new HashMap<StoreFileInfo, StoreFile>(compactedFiles.size()); - for (StoreFile sf : compactedFiles) { + HashMap<StoreFileInfo, HStoreFile> compactedFilesSet = new HashMap<>(compactedFiles.size()); + for (HStoreFile sf : compactedFiles) { compactedFilesSet.put(sf.getFileInfo(), sf); } @@ -647,13 +643,13 @@ public class HStore implements Store { LOG.info("Refreshing store files for region " + this.getRegionInfo().getRegionNameAsString() + " files to add: " + toBeAddedFiles + " files to remove: " + toBeRemovedFiles); - Set<StoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); + Set<HStoreFile> toBeRemovedStoreFiles = new HashSet<>(toBeRemovedFiles.size()); for (StoreFileInfo sfi : toBeRemovedFiles) { toBeRemovedStoreFiles.add(currentFilesSet.get(sfi)); } // try to open the files - List<StoreFile> openedFiles = openStoreFiles(toBeAddedFiles); + List<HStoreFile> openedFiles = openStoreFiles(toBeAddedFiles); // propogate the file changes to the underlying store file manager replaceStoreFiles(toBeRemovedStoreFiles, openedFiles); //won't throw an exception @@ -668,14 +664,14 @@ public class HStore implements Store { completeCompaction(toBeRemovedStoreFiles); } - private StoreFile createStoreFileAndReader(final Path p) throws IOException { + private HStoreFile createStoreFileAndReader(final Path p) throws IOException { StoreFileInfo info = new StoreFileInfo(conf, this.getFileSystem(), p); return createStoreFileAndReader(info); } - private StoreFile createStoreFileAndReader(final StoreFileInfo info) throws IOException { + private HStoreFile createStoreFileAndReader(StoreFileInfo info) throws IOException { info.setRegionCoprocessorHost(this.region.getCoprocessorHost()); - StoreFile storeFile = new HStoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, + HStoreFile storeFile = new HStoreFile(this.getFileSystem(), info, this.conf, this.cacheConf, this.family.getBloomFilterType(), isPrimaryReplicaStore()); storeFile.initReader(); return storeFile; @@ -734,12 +730,12 @@ public class HStore implements Store { * @return All store files. */ @Override - public Collection<StoreFile> getStorefiles() { + public Collection<HStoreFile> getStorefiles() { return this.storeEngine.getStoreFileManager().getStorefiles(); } @Override - public Collection<StoreFile> getCompactedFiles() { + public Collection<HStoreFile> getCompactedFiles() { return this.storeEngine.getStoreFileManager().getCompactedfiles(); } @@ -756,19 +752,19 @@ public class HStore implements Store { isPrimaryReplicaStore(), conf); reader.loadFileInfo(); - byte[] firstKey = reader.getFirstRowKey(); - Preconditions.checkState(firstKey != null, "First key can not be null"); - Cell lk = reader.getLastKey(); - Preconditions.checkState(lk != null, "Last key can not be null"); - byte[] lastKey = CellUtil.cloneRow(lk); + Optional<byte[]> firstKey = reader.getFirstRowKey(); + Preconditions.checkState(firstKey.isPresent(), "First key can not be null"); + Optional<Cell> lk = reader.getLastKey(); + Preconditions.checkState(lk.isPresent(), "Last key can not be null"); + byte[] lastKey = CellUtil.cloneRow(lk.get()); - LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) + + LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey.get()) + " last=" + Bytes.toStringBinary(lastKey)); LOG.debug("Region bounds: first=" + Bytes.toStringBinary(getRegionInfo().getStartKey()) + " last=" + Bytes.toStringBinary(getRegionInfo().getEndKey())); - if (!this.getRegionInfo().containsRange(firstKey, lastKey)) { + if (!this.getRegionInfo().containsRange(firstKey.get(), lastKey)) { throw new WrongRegionException( "Bulk load file " + srcPath.toString() + " does not fit inside region " + this.getRegionInfo().getRegionNameAsString()); @@ -842,7 +838,7 @@ public class HStore implements Store { LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " + dstPath + " - updating store file list."); - StoreFile sf = createStoreFileAndReader(dstPath); + HStoreFile sf = createStoreFileAndReader(dstPath); bulkLoadHFile(sf); LOG.info("Successfully loaded store file " + srcPath + " into store " + this @@ -852,11 +848,11 @@ public class HStore implements Store { } public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { - StoreFile sf = createStoreFileAndReader(fileInfo); + HStoreFile sf = createStoreFileAndReader(fileInfo); bulkLoadHFile(sf); } - private void bulkLoadHFile(StoreFile sf) throws IOException { + private void bulkLoadHFile(HStoreFile sf) throws IOException { StoreFileReader r = sf.getReader(); this.storeSize += r.length(); this.totalUncompressedBytes += r.getTotalUncompressedBytes(); @@ -883,13 +879,13 @@ public class HStore implements Store { } @Override - public ImmutableCollection<StoreFile> close() throws IOException { + public ImmutableCollection<HStoreFile> close() throws IOException { this.archiveLock.lock(); this.lock.writeLock().lock(); try { // Clear so metrics doesn't find them. - ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles(); - Collection<StoreFile> compactedfiles = + ImmutableCollection<HStoreFile> result = storeEngine.getStoreFileManager().clearFiles(); + Collection<HStoreFile> compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); // clear the compacted files if (compactedfiles != null && !compactedfiles.isEmpty()) { @@ -904,13 +900,13 @@ public class HStore implements Store { // close each store file in parallel CompletionService<Void> completionService = new ExecutorCompletionService<>(storeFileCloserThreadPool); - for (final StoreFile f : result) { + for (HStoreFile f : result) { completionService.submit(new Callable<Void>() { @Override public Void call() throws IOException { boolean evictOnClose = cacheConf != null? cacheConf.shouldEvictOnClose(): true; - f.closeReader(evictOnClose); + f.closeStoreFile(evictOnClose); return null; } }); @@ -1012,20 +1008,20 @@ public class HStore implements Store { throw lastException; } - /* + /** * @param path The pathname of the tmp file into which the store was flushed * @param logCacheFlushId * @param status - * @return StoreFile created. + * @return store file created. * @throws IOException */ - private StoreFile commitFile(final Path path, final long logCacheFlushId, MonitoredTask status) + private HStoreFile commitFile(Path path, long logCacheFlushId, MonitoredTask status) throws IOException { // Write-out finished successfully, move into the right spot Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); status.setStatus("Flushing " + this + ": reopening flushed file"); - StoreFile sf = createStoreFileAndReader(dstPath); + HStoreFile sf = createStoreFileAndReader(dstPath); StoreFileReader r = sf.getReader(); this.storeSize += r.length(); @@ -1041,35 +1037,32 @@ public class HStore implements Store { @Override public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, - boolean isCompaction, boolean includeMVCCReadpoint, - boolean includesTag) - throws IOException { + boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag) throws IOException { return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint, - includesTag, false); + includesTag, false); } - /* + /** * @param maxKeyCount * @param compression Compression algorithm to use * @param isCompaction whether we are creating a new file in a compaction - * @param includesMVCCReadPoint - whether to include MVCC or not + * @param includeMVCCReadpoint - whether to include MVCC or not * @param includesTag - includesTag or not * @return Writer for a new StoreFile in the tmp dir. */ @Override public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, - boolean shouldDropBehind) - throws IOException { + boolean shouldDropBehind) throws IOException { return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint, - includesTag, shouldDropBehind, null); + includesTag, shouldDropBehind, null); } - /* + /** * @param maxKeyCount * @param compression Compression algorithm to use * @param isCompaction whether we are creating a new file in a compaction - * @param includesMVCCReadPoint - whether to include MVCC or not + * @param includeMVCCReadpoint - whether to include MVCC or not * @param includesTag - includesTag or not * @return Writer for a new StoreFile in the tmp dir. */ @@ -1078,8 +1071,7 @@ public class HStore implements Store { @Override public StoreFileWriter createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, - boolean shouldDropBehind, final TimeRangeTracker trt) - throws IOException { + boolean shouldDropBehind, final TimeRangeTracker trt) throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { // Don't cache data on write on compactions. @@ -1133,15 +1125,18 @@ public class HStore implements Store { } - /* + private long getTotalSize(Collection<HStoreFile> sfs) { + return sfs.stream().mapToLong(sf -> sf.getReader().length()).sum(); + } + + /** * Change storeFiles adding into place the Reader produced by this new flush. * @param sfs Store files * @param snapshotId * @throws IOException * @return Whether compaction is required. */ - private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId) - throws IOException { + private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws IOException { this.lock.writeLock().lock(); try { this.storeEngine.getStoreFileManager().insertNewFiles(sfs); @@ -1159,10 +1154,7 @@ public class HStore implements Store { // notify to be called here - only in case of flushes notifyChangedReadersObservers(sfs); if (LOG.isTraceEnabled()) { - long totalSize = 0; - for (StoreFile sf : sfs) { - totalSize += sf.getReader().length(); - } + long totalSize = getTotalSize(sfs); String traceMessage = "FLUSH time,count,size,store size,store files [" + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]"; @@ -1171,11 +1163,11 @@ public class HStore implements Store { return needsCompaction(); } - /* + /** * Notify all observers that set of Readers has changed. * @throws IOException */ - private void notifyChangedReadersObservers(List<StoreFile> sfs) throws IOException { + private void notifyChangedReadersObservers(List<HStoreFile> sfs) throws IOException { for (ChangedReadersObserver o : this.changedReaderObservers) { List<KeyValueScanner> memStoreScanners; this.lock.readLock().lock(); @@ -1190,13 +1182,39 @@ public class HStore implements Store { /** * Get all scanners with no filtering based on TTL (that happens further down the line). + * @param cacheBlocks cache the blocks or not + * @param usePread true to use pread, false if not + * @param isCompaction true if the scanner is created for compaction + * @param matcher the scan query matcher + * @param startRow the start row + * @param stopRow the stop row + * @param readPt the read point of the current scan + * @return all scanners for this store + */ + public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, + boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) + throws IOException { + return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, + readPt); + } + + /** + * Get all scanners with no filtering based on TTL (that happens further down the line). + * @param cacheBlocks cache the blocks or not + * @param usePread true to use pread, false if not + * @param isCompaction true if the scanner is created for compaction + * @param matcher the scan query matcher + * @param startRow the start row + * @param includeStartRow true to include start row, false if not + * @param stopRow the stop row + * @param includeStopRow true to include stop row, false if not + * @param readPt the read point of the current scan * @return all scanners for this store */ - @Override public List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt) throws IOException { - Collection<StoreFile> storeFilesToScan; + Collection<HStoreFile> storeFilesToScan; List<KeyValueScanner> memStoreScanners; this.lock.readLock().lock(); try { @@ -1221,8 +1239,45 @@ public class HStore implements Store { return scanners; } - @Override - public List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks, + /** + * Create scanners on the given files and if needed on the memstore with no filtering based on TTL + * (that happens further down the line). + * @param files the list of files on which the scanners has to be created + * @param cacheBlocks cache the blocks or not + * @param usePread true to use pread, false if not + * @param isCompaction true if the scanner is created for compaction + * @param matcher the scan query matcher + * @param startRow the start row + * @param stopRow the stop row + * @param readPt the read point of the current scan + * @param includeMemstoreScanner true if memstore has to be included + * @return scanners on the given files and on the memstore if specified + */ + public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, + boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, + byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) + throws IOException { + return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, + false, readPt, includeMemstoreScanner); + } + + /** + * Create scanners on the given files and if needed on the memstore with no filtering based on TTL + * (that happens further down the line). + * @param files the list of files on which the scanners has to be created + * @param cacheBlocks ache the blocks or not + * @param usePread true to use pread, false if not + * @param isCompaction true if the scanner is created for compaction + * @param matcher the scan query matcher + * @param startRow the start row + * @param includeStartRow true to include start row, false if not + * @param stopRow the stop row + * @param includeStopRow true to include stop row, false if not + * @param readPt the read point of the current scan + * @param includeMemstoreScanner true if memstore has to be included + * @return scanners on the given files and on the memstore if specified + */ + public List<KeyValueScanner> getScanners(List<HStoreFile> files, boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, boolean includeMemstoreScanner) throws IOException { @@ -1305,16 +1360,16 @@ public class HStore implements Store { * @return Storefile we compacted into or null if we failed or opted out early. */ @Override - public List<StoreFile> compact(CompactionContext compaction, + public List<HStoreFile> compact(CompactionContext compaction, ThroughputController throughputController) throws IOException { return compact(compaction, throughputController, null); } @Override - public List<StoreFile> compact(CompactionContext compaction, + public List<HStoreFile> compact(CompactionContext compaction, ThroughputController throughputController, User user) throws IOException { assert compaction != null; - List<StoreFile> sfs = null; + List<HStoreFile> sfs = null; CompactionRequest cr = compaction.getRequest(); try { // Do all sanity checking in here if we have a valid CompactionRequest @@ -1322,7 +1377,7 @@ public class HStore implements Store { // block below long compactionStartTime = EnvironmentEdgeManager.currentTime(); assert compaction.hasSelection(); - Collection<StoreFile> filesToCompact = cr.getFiles(); + Collection<HStoreFile> filesToCompact = cr.getFiles(); assert !filesToCompact.isEmpty(); synchronized (filesCompacting) { // sanity check: we're compacting files that this store knows about @@ -1338,7 +1393,6 @@ public class HStore implements Store { // Commence the compaction. List<Path> newFiles = compaction.compact(throughputController, user); - long outputBytes = 0L; // TODO: get rid of this! if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) { LOG.warn("hbase.hstore.compaction.complete is set to false"); @@ -1347,8 +1401,8 @@ public class HStore implements Store { cacheConf != null? cacheConf.shouldEvictOnClose(): true; for (Path newFile : newFiles) { // Create storefile around what we wrote with a reader on it. - StoreFile sf = createStoreFileAndReader(newFile); - sf.closeReader(evictOnClose); + HStoreFile sf = createStoreFileAndReader(newFile); + sf.closeStoreFile(evictOnClose); sfs.add(sf); } return sfs; @@ -1364,10 +1418,7 @@ public class HStore implements Store { compactedCellsCount += getCompactionProgress().totalCompactingKVs; compactedCellsSize += getCompactionProgress().totalCompactedSize; } - - for (StoreFile sf : sfs) { - outputBytes += sf.getReader().length(); - } + long outputBytes = getTotalSize(sfs); // At this point the store will use new files for all new scanners. completeCompaction(filesToCompact); // update store size. @@ -1387,12 +1438,12 @@ public class HStore implements Store { } } - private List<StoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles, + private List<HStoreFile> moveCompatedFilesIntoPlace(CompactionRequest cr, List<Path> newFiles, User user) throws IOException { - List<StoreFile> sfs = new ArrayList<>(newFiles.size()); + List<HStoreFile> sfs = new ArrayList<>(newFiles.size()); for (Path newFile : newFiles) { assert newFile != null; - StoreFile sf = moveFileIntoPlace(newFile); + HStoreFile sf = moveFileIntoPlace(newFile); if (this.getCoprocessorHost() != null) { getCoprocessorHost().postCompact(this, sf, cr.getTracker(), user); } @@ -1403,7 +1454,7 @@ public class HStore implements Store { } // Package-visible for tests - StoreFile moveFileIntoPlace(final Path newFile) throws IOException { + HStoreFile moveFileIntoPlace(Path newFile) throws IOException { validateStoreFile(newFile); // Move the file into the right spot Path destPath = fs.commitStoreFile(getColumnFamilyName(), newFile); @@ -1415,17 +1466,15 @@ public class HStore implements Store { * @param filesCompacted Files compacted (input). * @param newFiles Files from compaction. */ - private void writeCompactionWalRecord(Collection<StoreFile> filesCompacted, - Collection<StoreFile> newFiles) throws IOException { - if (region.getWAL() == null) return; - List<Path> inputPaths = new ArrayList<>(filesCompacted.size()); - for (StoreFile f : filesCompacted) { - inputPaths.add(f.getPath()); - } - List<Path> outputPaths = new ArrayList<>(newFiles.size()); - for (StoreFile f : newFiles) { - outputPaths.add(f.getPath()); + private void writeCompactionWalRecord(Collection<HStoreFile> filesCompacted, + Collection<HStoreFile> newFiles) throws IOException { + if (region.getWAL() == null) { + return; } + List<Path> inputPaths = + filesCompacted.stream().map(HStoreFile::getPath).collect(Collectors.toList()); + List<Path> outputPaths = + newFiles.stream().map(HStoreFile::getPath).collect(Collectors.toList()); HRegionInfo info = this.region.getRegionInfo(); CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, family.getName(), inputPaths, outputPaths, fs.getStoreDir(getColumnFamilyDescriptor().getNameAsString())); @@ -1437,8 +1486,8 @@ public class HStore implements Store { } @VisibleForTesting - void replaceStoreFiles(final Collection<StoreFile> compactedFiles, - final Collection<StoreFile> result) throws IOException { + void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result) + throws IOException { this.lock.writeLock().lock(); try { this.storeEngine.getStoreFileManager().addCompactionResults(compactedFiles, result); @@ -1455,7 +1504,7 @@ public class HStore implements Store { * @param compactionStartTime Start time. */ private void logCompactionEndMessage( - CompactionRequest cr, List<StoreFile> sfs, long now, long compactionStartTime) { + CompactionRequest cr, List<HStoreFile> sfs, long now, long compactionStartTime) { StringBuilder message = new StringBuilder( "Completed" + (cr.isMajor() ? " major" : "") + " compaction of " + cr.getFiles().size() + (cr.isAllFiles() ? " (all)" : "") + " file(s) in " @@ -1463,7 +1512,7 @@ public class HStore implements Store { if (sfs.isEmpty()) { message.append("none, "); } else { - for (StoreFile sf: sfs) { + for (HStoreFile sf: sfs) { message.append(sf.getPath().getName()); message.append("(size="); message.append(TraditionalBinaryPrefix.long2String(sf.getReader().length(), "", 1)); @@ -1479,10 +1528,7 @@ public class HStore implements Store { LOG.info(message.toString()); if (LOG.isTraceEnabled()) { int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); - long resultSize = 0; - for (StoreFile sf : sfs) { - resultSize += sf.getReader().length(); - } + long resultSize = getTotalSize(sfs); String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; @@ -1496,9 +1542,8 @@ public class HStore implements Store { * See HBASE-2231. * @param compaction */ - public void replayCompactionMarker(CompactionDescriptor compaction, - boolean pickCompactionFiles, boolean removeFiles) - throws IOException { + public void replayCompactionMarker(CompactionDescriptor compaction, boolean pickCompactionFiles, + boolean removeFiles) throws IOException { LOG.debug("Completing compaction from the WAL marker"); List<String> compactionInputs = compaction.getCompactionInputList(); List<String> compactionOutputs = Lists.newArrayList(compaction.getCompactionOutputList()); @@ -1525,23 +1570,23 @@ public class HStore implements Store { } //some of the input files might already be deleted - List<StoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); - for (StoreFile sf : this.getStorefiles()) { + List<HStoreFile> inputStoreFiles = new ArrayList<>(compactionInputs.size()); + for (HStoreFile sf : this.getStorefiles()) { if (inputFiles.contains(sf.getPath().getName())) { inputStoreFiles.add(sf); } } // check whether we need to pick up the new files - List<StoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); + List<HStoreFile> outputStoreFiles = new ArrayList<>(compactionOutputs.size()); if (pickCompactionFiles) { - for (StoreFile sf : this.getStorefiles()) { + for (HStoreFile sf : this.getStorefiles()) { compactionOutputs.remove(sf.getPath().getName()); } for (String compactionOutput : compactionOutputs) { StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), compactionOutput); - StoreFile storeFile = createStoreFileAndReader(storeFileInfo); + HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); outputStoreFiles.add(storeFile); } } @@ -1561,8 +1606,9 @@ public class HStore implements Store { * but instead makes a compaction candidate list by itself. * @param N Number of files. */ + @VisibleForTesting public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException { - List<StoreFile> filesToCompact; + List<HStoreFile> filesToCompact; boolean isMajor; this.lock.readLock().lock(); @@ -1572,7 +1618,7 @@ public class HStore implements Store { if (!filesCompacting.isEmpty()) { // exclude all files older than the newest file we're currently // compacting. this allows us to preserve contiguity (HBASE-2856) - StoreFile last = filesCompacting.get(filesCompacting.size() - 1); + HStoreFile last = filesCompacting.get(filesCompacting.size() - 1); int idx = filesToCompact.indexOf(last); Preconditions.checkArgument(idx != -1); filesToCompact.subList(0, idx + 1).clear(); @@ -1598,11 +1644,11 @@ public class HStore implements Store { .compactForTesting(filesToCompact, isMajor); for (Path newFile: newFiles) { // Move the compaction into place. - StoreFile sf = moveFileIntoPlace(newFile); + HStoreFile sf = moveFileIntoPlace(newFile); if (this.getCoprocessorHost() != null) { this.getCoprocessorHost().postCompact(this, sf, null, null); } - replaceStoreFiles(filesToCompact, Lists.newArrayList(sf)); + replaceStoreFiles(filesToCompact, Collections.singletonList(sf)); completeCompaction(filesToCompact); } } finally { @@ -1624,7 +1670,7 @@ public class HStore implements Store { @Override public boolean isMajorCompaction() throws IOException { - for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) { + for (HStoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) { // TODO: what are these reader checks all over the place? if (sf.getReader() == null) { LOG.debug("StoreFile " + sf + " has null Reader"); @@ -1652,7 +1698,7 @@ public class HStore implements Store { synchronized (filesCompacting) { // First, see if coprocessor would want to override selection. if (this.getCoprocessorHost() != null) { - final List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); + final List<HStoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting); boolean override = false; override = getCoprocessorHost().preCompactSelection(this, candidatesForCoproc, tracker, user); @@ -1688,7 +1734,7 @@ public class HStore implements Store { } // Finally, we have the resulting files list. Check if we have any files at all. request = compaction.getRequest(); - Collection<StoreFile> selectedFiles = request.getFiles(); + Collection<HStoreFile> selectedFiles = request.getFiles(); if (selectedFiles.isEmpty()) { return Optional.empty(); } @@ -1716,7 +1762,7 @@ public class HStore implements Store { } /** Adds the files to compacting files. filesCompacting must be locked. */ - private void addToCompactingFiles(final Collection<StoreFile> filesToAdd) { + private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) { if (filesToAdd == null) return; // Check that we do not try to compact the same StoreFile twice. if (!Collections.disjoint(filesCompacting, filesToAdd)) { @@ -1734,7 +1780,7 @@ public class HStore implements Store { return; } this.lock.readLock().lock(); - Collection<StoreFile> delSfs = null; + Collection<HStoreFile> delSfs = null; try { synchronized (filesCompacting) { long cfTtl = getStoreFileTtl(); @@ -1749,7 +1795,7 @@ public class HStore implements Store { } if (delSfs == null || delSfs.isEmpty()) return; - Collection<StoreFile> newFiles = new ArrayList<>(); // No new files. + Collection<HStoreFile> newFiles = new ArrayList<>(); // No new files. writeCompactionWalRecord(delSfs, newFiles); replaceStoreFiles(delSfs, newFiles); completeCompaction(delSfs); @@ -1775,23 +1821,20 @@ public class HStore implements Store { } /** - * Validates a store file by opening and closing it. In HFileV2 this should - * not be an expensive operation. - * + * Validates a store file by opening and closing it. In HFileV2 this should not be an expensive + * operation. * @param path the path to the store file */ - private void validateStoreFile(Path path) - throws IOException { - StoreFile storeFile = null; + private void validateStoreFile(Path path) throws IOException { + HStoreFile storeFile = null; try { storeFile = createStoreFileAndReader(path); } catch (IOException e) { - LOG.error("Failed to open store file : " + path - + ", keeping it in tmp location", e); + LOG.error("Failed to open store file : " + path + ", keeping it in tmp location", e); throw e; } finally { if (storeFile != null) { - storeFile.closeReader(false); + storeFile.closeStoreFile(false); } } } @@ -1811,11 +1854,11 @@ public class HStore implements Store { * @param compactedFiles list of files that were compacted */ @VisibleForTesting - protected void completeCompaction(final Collection<StoreFile> compactedFiles) + protected void completeCompaction(Collection<HStoreFile> compactedFiles) throws IOException { this.storeSize = 0L; this.totalUncompressedBytes = 0L; - for (StoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { + for (HStoreFile hsf : this.storeEngine.getStoreFileManager().getStorefiles()) { StoreFileReader r = hsf.getReader(); if (r == null) { LOG.warn("StoreFile " + hsf + " has a null Reader"); @@ -1857,7 +1900,7 @@ public class HStore implements Store { } @Override - public byte[] getSplitPoint() { + public Optional<byte[]> getSplitPoint() { this.lock.readLock().lock(); try { // Should already be enforced by the split policy! @@ -1867,7 +1910,7 @@ public class HStore implements Store { if (LOG.isTraceEnabled()) { LOG.trace("Not splittable; has references: " + this); } - return null; + return Optional.empty(); } return this.storeEngine.getStoreFileManager().getSplitPoint(); } catch(IOException e) { @@ -1875,7 +1918,7 @@ public class HStore implements Store { } finally { this.lock.readLock().unlock(); } - return null; + return Optional.empty(); } @Override @@ -1924,24 +1967,39 @@ public class HStore implements Store { return scanner; } - @Override + /** + * Recreates the scanners on the current list of active store file scanners + * @param currentFileScanners the current set of active store file scanners + * @param cacheBlocks cache the blocks or not + * @param usePread use pread or not + * @param isCompaction is the scanner for compaction + * @param matcher the scan query matcher + * @param startRow the scan's start row + * @param includeStartRow should the scan include the start row + * @param stopRow the scan's stop row + * @param includeStopRow should the scan include the stop row + * @param readPt the read point of the current scane + * @param includeMemstoreScanner whether the current scanner should include memstorescanner + * @return list of scanners recreated on the current Scanners + * @throws IOException + */ public List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, boolean includeMemstoreScanner) throws IOException { this.lock.readLock().lock(); try { - Map<String, StoreFile> name2File = + Map<String, HStoreFile> name2File = new HashMap<>(getStorefilesCount() + getCompactedFilesCount()); - for (StoreFile file : getStorefiles()) { + for (HStoreFile file : getStorefiles()) { name2File.put(file.getFileInfo().getActiveFileName(), file); } if (getCompactedFiles() != null) { - for (StoreFile file : getCompactedFiles()) { + for (HStoreFile file : getCompactedFiles()) { name2File.put(file.getFileInfo().getActiveFileName(), file); } } - List<StoreFile> filesToReopen = new ArrayList<>(); + List<HStoreFile> filesToReopen = new ArrayList<>(); for (KeyValueScanner kvs : currentFileScanners) { assert kvs.isFileScanner(); if (kvs.peek() == null) { @@ -1974,87 +2032,45 @@ public class HStore implements Store { return this.storeEngine.getStoreFileManager().getCompactedFilesCount(); } + private LongStream getStoreFileCreatedTimestampStream() { + return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { + if (sf.getReader() == null) { + LOG.warn("StoreFile " + sf + " has a null Reader"); + return false; + } else { + return true; + } + }).filter(HStoreFile::isHFile).mapToLong(sf -> sf.getFileInfo().getCreatedTimestamp()); + } + @Override public long getMaxStoreFileAge() { - long earliestTS = Long.MAX_VALUE; - for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) { - StoreFileReader r = s.getReader(); - if (r == null) { - LOG.warn("StoreFile " + s + " has a null Reader"); - continue; - } - if (!s.isHFile()) { - continue; - } - long createdTS = s.getFileInfo().getCreatedTimestamp(); - earliestTS = (createdTS < earliestTS) ? createdTS : earliestTS; - } - long now = EnvironmentEdgeManager.currentTime(); - return now - earliestTS; + return EnvironmentEdgeManager.currentTime() - + getStoreFileCreatedTimestampStream().min().orElse(Long.MAX_VALUE); } @Override public long getMinStoreFileAge() { - long latestTS = 0; - for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) { - StoreFileReader r = s.getReader(); - if (r == null) { - LOG.warn("StoreFile " + s + " has a null Reader"); - continue; - } - if (!s.isHFile()) { - continue; - } - long createdTS = s.getFileInfo().getCreatedTimestamp(); - latestTS = (createdTS > latestTS) ? createdTS : latestTS; - } - long now = EnvironmentEdgeManager.currentTime(); - return now - latestTS; + return EnvironmentEdgeManager.currentTime() - + getStoreFileCreatedTimestampStream().max().orElse(0L); } @Override public long getAvgStoreFileAge() { - long sum = 0, count = 0; - for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) { - StoreFileReader r = s.getReader(); - if (r == null) { - LOG.warn("StoreFile " + s + " has a null Reader"); - continue; - } - if (!s.isHFile()) { - continue; - } - sum += s.getFileInfo().getCreatedTimestamp(); - count++; - } - if (count == 0) { - return 0; - } - long avgTS = sum / count; - long now = EnvironmentEdgeManager.currentTime(); - return now - avgTS; + OptionalDouble avg = getStoreFileCreatedTimestampStream().average(); + return avg.isPresent() ? EnvironmentEdgeManager.currentTime() - (long) avg.getAsDouble() : 0L; } @Override public long getNumReferenceFiles() { - long numRefFiles = 0; - for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) { - if (s.isReference()) { - numRefFiles++; - } - } - return numRefFiles; + return this.storeEngine.getStoreFileManager().getStorefiles().stream() + .filter(HStoreFile::isReference).count(); } @Override public long getNumHFiles() { - long numHFiles = 0; - for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) { - if (s.isHFile()) { - numHFiles++; - } - } - return numHFiles; + return this.storeEngine.getStoreFileManager().getStorefiles().stream() + .filter(HStoreFile::isHFile).count(); } @Override @@ -2074,59 +2090,41 @@ public class HStore implements Store { return getStorefilesSize(storeFile -> storeFile.isHFile()); } - private long getStorefilesSize(Predicate<StoreFile> predicate) { - long size = 0; - for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) { - StoreFileReader r = s.getReader(); - if (r == null) { - LOG.warn("StoreFile " + s + " has a null Reader"); - continue; + private long getStorefilesSize(Predicate<HStoreFile> predicate) { + return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { + if (sf.getReader() == null) { + LOG.warn("StoreFile " + sf + " has a null Reader"); + return false; + } else { + return true; } - if (predicate.test(s)) { - size += r.length(); + }).filter(predicate).mapToLong(sf -> sf.getReader().length()).sum(); + } + + private long getStoreFileFieldSize(ToLongFunction<StoreFileReader> f) { + return this.storeEngine.getStoreFileManager().getStorefiles().stream().filter(sf -> { + if (sf.getReader() == null) { + LOG.warn("StoreFile " + sf + " has a null Reader"); + return false; + } else { + return true; } - } - return size; + }).map(HStoreFile::getReader).mapToLong(f).sum(); } @Override public long getStorefilesIndexSize() { - long size = 0; - for (StoreFile s: this.storeEngine.getStoreFileManager().getStorefiles()) { - StoreFileReader r = s.getReader(); - if (r == null) { - LOG.warn("StoreFile " + s + " has a null Reader"); - continue; - } - size += r.indexSize(); - } - return size; + return getStoreFileFieldSize(StoreFileReader::indexSize); } @Override public long getTotalStaticIndexSize() { - long size = 0; - for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) { - StoreFileReader r = s.getReader(); - if (r == null) { - continue; - } - size += r.getUncompressedDataIndexSize(); - } - return size; + return getStoreFileFieldSize(StoreFileReader::getUncompressedDataIndexSize); } @Override public long getTotalStaticBloomSize() { - long size = 0; - for (StoreFile s : this.storeEngine.getStoreFileManager().getStorefiles()) { - StoreFileReader r = s.getReader(); - if (r == null) { - continue; - } - size += r.getTotalBloomSize(); - } - return size; + return getStoreFileFieldSize(StoreFileReader::getTotalBloomSize); } @Override @@ -2247,19 +2245,19 @@ public class HStore implements Store { if (this.tempFiles == null || this.tempFiles.isEmpty()) { return false; } - List<StoreFile> storeFiles = new ArrayList<>(this.tempFiles.size()); + List<HStoreFile> storeFiles = new ArrayList<>(this.tempFiles.size()); for (Path storeFilePath : tempFiles) { try { - StoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); + HStoreFile sf = HStore.this.commitFile(storeFilePath, cacheFlushSeqNum, status); outputFileSize += sf.getReader().length(); storeFiles.add(sf); } catch (IOException ex) { LOG.error("Failed to commit store file " + storeFilePath, ex); // Try to delete the files we have committed before. - for (StoreFile sf : storeFiles) { + for (HStoreFile sf : storeFiles) { Path pathToDelete = sf.getPath(); try { - sf.deleteReader(); + sf.deleteStoreFile(); } catch (IOException deleteEx) { LOG.fatal("Failed to delete store file we committed, halting " + pathToDelete, ex); Runtime.getRuntime().halt(1); @@ -2269,7 +2267,7 @@ public class HStore implements Store { } } - for (StoreFile sf : storeFiles) { + for (HStoreFile sf : storeFiles) { if (HStore.this.getCoprocessorHost() != null) { HStore.this.getCoprocessorHost().postFlush(HStore.this, sf); } @@ -2305,11 +2303,11 @@ public class HStore implements Store { @Override public void replayFlush(List<String> fileNames, boolean dropMemstoreSnapshot) throws IOException { - List<StoreFile> storeFiles = new ArrayList<>(fileNames.size()); + List<HStoreFile> storeFiles = new ArrayList<>(fileNames.size()); for (String file : fileNames) { // open the file as a store file (hfile link, etc) StoreFileInfo storeFileInfo = fs.getStoreFileInfo(getColumnFamilyName(), file); - StoreFile storeFile = createStoreFileAndReader(storeFileInfo); + HStoreFile storeFile = createStoreFileAndReader(storeFileInfo); storeFiles.add(storeFile); HStore.this.storeSize += storeFile.getReader().length(); HStore.this.totalUncompressedBytes += storeFile.getReader().getTotalUncompressedBytes(); @@ -2498,9 +2496,9 @@ public class HStore implements Store { archiveLock.lock(); try { lock.readLock().lock(); - Collection<StoreFile> copyCompactedfiles = null; + Collection<HStoreFile> copyCompactedfiles = null; try { - Collection<StoreFile> compactedfiles = + Collection<HStoreFile> compactedfiles = this.getStoreEngine().getStoreFileManager().getCompactedfiles(); if (compactedfiles != null && compactedfiles.size() != 0) { // Do a copy under read lock @@ -2527,10 +2525,10 @@ public class HStore implements Store { * @param compactedfiles The compacted files in this store that are not active in reads * @throws IOException */ - private void removeCompactedfiles(Collection<StoreFile> compactedfiles) + private void removeCompactedfiles(Collection<HStoreFile> compactedfiles) throws IOException { - final List<StoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); - for (final StoreFile file : compactedfiles) { + final List<HStoreFile> filesToRemove = new ArrayList<>(compactedfiles.size()); + for (final HStoreFile file : compactedfiles) { synchronized (file) { try { StoreFileReader r = file.getReader(); @@ -2573,7 +2571,7 @@ public class HStore implements Store { // files which were successfully archived. Otherwise we will receive a // FileNotFoundException when we attempt to re-archive them in the next go around. Collection<Path> failedFiles = fae.getFailedFiles(); - Iterator<StoreFile> iter = filesToRemove.iterator(); + Iterator<HStoreFile> iter = filesToRemove.iterator(); while (iter.hasNext()) { if (failedFiles.contains(iter.next().getPath())) { iter.remove(); @@ -2601,7 +2599,7 @@ public class HStore implements Store { return this.memstore.isSloppy(); } - private void clearCompactedfiles(final List<StoreFile> filesToRemove) throws IOException { + private void clearCompactedfiles(List<HStoreFile> filesToRemove) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Clearing the compacted file " + filesToRemove + " from this store"); }
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java index c43b788..a79af13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java @@ -20,8 +20,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.Collections; -import java.util.Comparator; import java.util.Map; +import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -32,14 +32,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HDFSBlocksDistribution; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockType; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.util.BloomFilterFactory; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; @@ -61,8 +62,50 @@ public class HStoreFile implements StoreFile { private static final Log LOG = LogFactory.getLog(HStoreFile.class.getName()); + public static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead"; + private static final boolean DEFAULT_STORE_FILE_READER_NO_READAHEAD = false; + // Keys for fileinfo values in HFile + + /** Max Sequence ID in FileInfo */ + public static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); + + /** Major compaction flag in FileInfo */ + public static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); + + /** Minor compaction flag in FileInfo */ + public static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY = + Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION"); + + /** Bloom filter Type in FileInfo */ + public static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); + + /** Delete Family Count in FileInfo */ + public static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT"); + + /** Last Bloom filter key in FileInfo */ + public static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY"); + + /** Key for Timerange information in metadata */ + public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); + + /** Key for timestamp of earliest-put in metadata */ + public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS"); + + /** Key for the number of mob cells in metadata */ + public static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT"); + + /** Meta key set when store file is a result of a bulk load */ + public static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK"); + public static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP"); + + /** + * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets + * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped. + */ + public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); + private final StoreFileInfo fileInfo; private final FileSystem fs; @@ -90,29 +133,28 @@ public class HStoreFile implements StoreFile { private long maxMemstoreTS = -1; // firstKey, lastkey and cellComparator will be set when openReader. - private Cell firstKey; + private Optional<Cell> firstKey; - private Cell lastKey; + private Optional<Cell> lastKey; - private Comparator<Cell> comparator; + private CellComparator comparator; - @Override public CacheConfig getCacheConf() { return cacheConf; } @Override - public Cell getFirstKey() { + public Optional<Cell> getFirstKey() { return firstKey; } @Override - public Cell getLastKey() { + public Optional<Cell> getLastKey() { return lastKey; } @Override - public Comparator<Cell> getComparator() { + public CellComparator getComparator() { return comparator; } @@ -155,27 +197,6 @@ public class HStoreFile implements StoreFile { * configuration. This may or may not be the same as the Bloom filter type actually * present in the HFile, because column family configuration might change. If this is * {@link BloomType#NONE}, the existing Bloom filter is ignored. - * @deprecated Now we will specific whether the StoreFile is for primary replica when - * constructing, so please use {@link #HStoreFile(FileSystem, Path, Configuration, - * CacheConfig, BloomType, boolean)} directly. - */ - @Deprecated - public HStoreFile(final FileSystem fs, final Path p, final Configuration conf, - final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException { - this(fs, new StoreFileInfo(conf, fs, p), conf, cacheConf, cfBloomType); - } - - /** - * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram - * depending on the underlying files (10-20MB?). - * @param fs The current file system to use. - * @param p The path of the file. - * @param conf The current configuration. - * @param cacheConf The cache configuration and block cache reference. - * @param cfBloomType The bloom type to use for this store file as specified by column family - * configuration. This may or may not be the same as the Bloom filter type actually - * present in the HFile, because column family configuration might change. If this is - * {@link BloomType#NONE}, the existing Bloom filter is ignored. * @param primaryReplica true if this is a store file for primary replica, otherwise false. * @throws IOException */ @@ -187,27 +208,6 @@ public class HStoreFile implements StoreFile { /** * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram * depending on the underlying files (10-20MB?). - * @param fs The current file system to use. - * @param fileInfo The store file information. - * @param conf The current configuration. - * @param cacheConf The cache configuration and block cache reference. - * @param cfBloomType The bloom type to use for this store file as specified by column family - * configuration. This may or may not be the same as the Bloom filter type actually - * present in the HFile, because column family configuration might change. If this is - * {@link BloomType#NONE}, the existing Bloom filter is ignored. - * @deprecated Now we will specific whether the StoreFile is for primary replica when - * constructing, so please use {@link #HStoreFile(FileSystem, StoreFileInfo, - * Configuration, CacheConfig, BloomType, boolean)} directly. - */ - @Deprecated - public HStoreFile(final FileSystem fs, final StoreFileInfo fileInfo, final Configuration conf, - final CacheConfig cacheConf, final BloomType cfBloomType) throws IOException { - this(fs, fileInfo, conf, cacheConf, cfBloomType, true); - } - - /** - * Constructor, loads a reader and it's indices, etc. May allocate a substantial amount of ram - * depending on the underlying files (10-20MB?). * @param fs fs The current file system to use. * @param fileInfo The store file information. * @param conf The current configuration. @@ -235,7 +235,10 @@ public class HStoreFile implements StoreFile { this.primaryReplica = primaryReplica; } - @Override + /** + * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a + * reference. + */ public StoreFileInfo getFileInfo() { return this.fileInfo; } @@ -283,7 +286,11 @@ public class HStoreFile implements StoreFile { return fileInfo.getModificationTime(); } - @Override + /** + * Only used by the Striped Compaction Policy + * @param key + * @return value associated with the metadata key + */ public byte[] getMetadataValue(byte[] key) { return metadataMap.get(key); } @@ -299,7 +306,6 @@ public class HStoreFile implements StoreFile { return bulkLoadedHFile || (metadataMap != null && metadataMap.containsKey(BULKLOAD_TIME_KEY)); } - @Override public boolean isCompactedAway() { return compactedAway; } @@ -309,7 +315,9 @@ public class HStoreFile implements StoreFile { return refCount.get(); } - @Override + /** + * @return true if the file is still used in reads + */ public boolean isReferencedInReads() { int rc = refCount.get(); assert rc >= 0; // we should not go negative. @@ -331,7 +339,7 @@ public class HStoreFile implements StoreFile { /** * Opens reader on this store file. Called by Constructor. * @throws IOException - * @see #closeReader(boolean) + * @see #closeStoreFile(boolean) */ private void open() throws IOException { if (this.reader != null) { @@ -440,7 +448,9 @@ public class HStoreFile implements StoreFile { comparator = reader.getComparator(); } - @Override + /** + * Initialize the reader used for pread. + */ public void initReader() throws IOException { if (reader == null) { try { @@ -448,7 +458,7 @@ public class HStoreFile implements StoreFile { } catch (Exception e) { try { boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; - this.closeReader(evictOnClose); + this.closeStoreFile(evictOnClose); } catch (IOException ee) { LOG.warn("failed to close reader", ee); } @@ -465,14 +475,22 @@ public class HStoreFile implements StoreFile { return reader; } - @Override + /** + * Get a scanner which uses pread. + * <p> + * Must be called after initReader. + */ public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) { return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, canOptimizeForNonNullColumn); } - @Override + /** + * Get a scanner which uses streaming read. + * <p> + * Must be called after initReader. + */ public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) throws IOException { @@ -480,31 +498,37 @@ public class HStoreFile implements StoreFile { isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); } - @Override + /** + * @return Current reader. Must call initReader first else returns null. + * @see #initReader() + */ public StoreFileReader getReader() { return this.reader; } - @Override - public synchronized void closeReader(boolean evictOnClose) - throws IOException { + /** + * @param evictOnClose whether to evict blocks belonging to this file + * @throws IOException + */ + public synchronized void closeStoreFile(boolean evictOnClose) throws IOException { if (this.reader != null) { this.reader.close(evictOnClose); this.reader = null; } } - @Override - public void markCompactedAway() { - this.compactedAway = true; + /** + * Delete this file + * @throws IOException + */ + public void deleteStoreFile() throws IOException { + boolean evictOnClose = cacheConf != null ? cacheConf.shouldEvictOnClose() : true; + closeStoreFile(evictOnClose); + this.fs.delete(getPath(), true); } - @Override - public void deleteReader() throws IOException { - boolean evictOnClose = - cacheConf != null? cacheConf.shouldEvictOnClose(): true; - closeReader(evictOnClose); - this.fs.delete(getPath(), true); + public void markCompactedAway() { + this.compactedAway = true; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index 4528517..8af33b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -246,30 +246,27 @@ public class MemStoreCompactor { MemStoreSegmentsIterator iterator = null; switch (action) { - case COMPACT: - iterator = - new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(), - compactingMemStore.getComparator(), - compactionKVMax, compactingMemStore.getStore()); + case COMPACT: + iterator = new MemStoreCompactorSegmentsIterator(versionedList.getStoreSegments(), + compactingMemStore.getComparator(), compactionKVMax, compactingMemStore.getStore()); - result = SegmentFactory.instance().createImmutableSegmentByCompaction( + result = SegmentFactory.instance().createImmutableSegmentByCompaction( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, versionedList.getNumOfCells(), compactingMemStore.getIndexType()); - iterator.close(); - break; - case MERGE: - iterator = - new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), - compactingMemStore.getComparator(), - compactionKVMax); + iterator.close(); + break; + case MERGE: + iterator = new MemStoreMergerSegmentsIterator(versionedList.getStoreSegments(), + compactingMemStore.getComparator(), compactionKVMax); - result = SegmentFactory.instance().createImmutableSegmentByMerge( + result = SegmentFactory.instance().createImmutableSegmentByMerge( compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator, versionedList.getNumOfCells(), versionedList.getStoreSegments(), compactingMemStore.getIndexType()); - iterator.close(); - break; - default: throw new RuntimeException("Unknown action " + action); // sanity check + iterator.close(); + break; + default: + throw new RuntimeException("Unknown action " + action); // sanity check } return result; http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java index 3d88955..b3ba998 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java @@ -45,10 +45,8 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator private StoreScanner compactingScanner; // C-tor - public MemStoreCompactorSegmentsIterator( - List<ImmutableSegment> segments, - CellComparator comparator, int compactionKVMax, Store store - ) throws IOException { + public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments, + CellComparator comparator, int compactionKVMax, HStore store) throws IOException { super(compactionKVMax); List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(); @@ -108,7 +106,7 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator * Creates the scanner for compacting the pipeline. * @return the scanner */ - private StoreScanner createScanner(Store store, List<KeyValueScanner> scanners) + private StoreScanner createScanner(HStore store, List<KeyValueScanner> scanners) throws IOException { // Get all available versions return new StoreScanner(store, store.getScanInfo(), OptionalInt.of(Integer.MAX_VALUE), scanners, http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java index 9bdeedc..b9f9af8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MobStoreScanner.java @@ -23,9 +23,9 @@ import java.util.List; import java.util.NavigableSet; import org.apache.hadoop.hbase.Cell; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.yetus.audience.InterfaceAudience; /** * Scanner scans both the memstore and the MOB Store. Coalesce KeyValue stream into @@ -39,7 +39,7 @@ public class MobStoreScanner extends StoreScanner { private boolean readEmptyValueOnMobCellMiss = false; private final HMobStore mobStore; - public MobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + public MobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns, long readPt) throws IOException { super(store, scanInfo, scan, columns, readPt); cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 044c4dc..fe0f30e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -499,7 +499,7 @@ public class RegionCoprocessorHost * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, * InternalScanner, CompactionLifeCycleTracker, long)} */ - public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners, + public InternalScanner preCompactScannerOpen(HStore store, List<StoreFileScanner> scanners, ScanType scanType, long earliestPutTs, CompactionLifeCycleTracker tracker, User user, long readPoint) throws IOException { return execOperationWithResult(null, @@ -514,7 +514,7 @@ public class RegionCoprocessorHost } /** - * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently + * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently * available candidates. * @param store The store where compaction is being requested * @param candidates The currently available store files @@ -522,7 +522,7 @@ public class RegionCoprocessorHost * @return If {@code true}, skip the normal selection process and use the current list * @throws IOException */ - public boolean preCompactSelection(Store store, List<StoreFile> candidates, + public boolean preCompactSelection(HStore store, List<HStoreFile> candidates, CompactionLifeCycleTracker tracker, User user) throws IOException { return execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { @Override @@ -534,13 +534,13 @@ public class RegionCoprocessorHost } /** - * Called after the {@link StoreFile}s to be compacted have been selected from the available + * Called after the {@link HStoreFile}s to be compacted have been selected from the available * candidates. * @param store The store where compaction is being requested * @param selected The store files selected to compact * @param tracker used to track the life cycle of a compaction */ - public void postCompactSelection(Store store, ImmutableList<StoreFile> selected, + public void postCompactSelection(HStore store, ImmutableList<HStoreFile> selected, CompactionLifeCycleTracker tracker, User user) throws IOException { execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { @Override @@ -559,7 +559,7 @@ public class RegionCoprocessorHost * @param tracker used to track the life cycle of a compaction * @throws IOException */ - public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType, + public InternalScanner preCompact(HStore store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, User user) throws IOException { return execOperationWithResult(false, scanner, coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>(user) { @@ -578,7 +578,7 @@ public class RegionCoprocessorHost * @param tracker used to track the life cycle of a compaction * @throws IOException */ - public void postCompact(Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker, + public void postCompact(HStore store, HStoreFile resultFile, CompactionLifeCycleTracker tracker, User user) throws IOException { execOperation(coprocessors.isEmpty() ? null : new RegionOperation(user) { @Override @@ -593,7 +593,7 @@ public class RegionCoprocessorHost * Invoked before a memstore flush * @throws IOException */ - public InternalScanner preFlush(final Store store, final InternalScanner scanner) + public InternalScanner preFlush(HStore store, final InternalScanner scanner) throws IOException { return execOperationWithResult(false, scanner, coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { @@ -623,16 +623,16 @@ public class RegionCoprocessorHost * See * {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} */ - public InternalScanner preFlushScannerOpen(final Store store, - final List<KeyValueScanner> scanners, final long readPoint) throws IOException { + public InternalScanner preFlushScannerOpen(HStore store, List<KeyValueScanner> scanners, + long readPoint) throws IOException { return execOperationWithResult(null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { - @Override - public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) - throws IOException { - setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint)); - } - }); + coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preFlushScannerOpen(ctx, store, scanners, getResult(), readPoint)); + } + }); } /** @@ -653,7 +653,7 @@ public class RegionCoprocessorHost * Invoked after a memstore flush * @throws IOException */ - public void postFlush(final Store store, final StoreFile storeFile) throws IOException { + public void postFlush(HStore store, HStoreFile storeFile) throws IOException { execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { @Override public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) @@ -1136,16 +1136,16 @@ public class RegionCoprocessorHost * See * {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner, long)} */ - public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan, - final NavigableSet<byte[]> targetCols, final long readPt) throws IOException { + public KeyValueScanner preStoreScannerOpen(HStore store, Scan scan, + NavigableSet<byte[]> targetCols, long readPt) throws IOException { return execOperationWithResult(null, - coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() { - @Override - public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) - throws IOException { - setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult(), readPt)); - } - }); + coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult(), readPt)); + } + }); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java index 5ccd6e3..71b7b9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionSplitPolicy.java @@ -19,14 +19,15 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.List; +import java.util.Optional; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; @@ -80,12 +81,12 @@ public abstract class RegionSplitPolicy extends Configured { byte[] splitPointFromLargestStore = null; long largestStoreSize = 0; - for (Store s : stores) { - byte[] splitPoint = s.getSplitPoint(); + for (HStore s : stores) { + Optional<byte[]> splitPoint = s.getSplitPoint(); // Store also returns null if it has references as way of indicating it is not splittable long storeSize = s.getSize(); - if (splitPoint != null && largestStoreSize < storeSize) { - splitPointFromLargestStore = splitPoint; + if (splitPoint.isPresent() && largestStoreSize < storeSize) { + splitPointFromLargestStore = splitPoint.get(); largestStoreSize = storeSize; } } @@ -131,7 +132,7 @@ public abstract class RegionSplitPolicy extends Configured { /** * In {@link HRegionFileSystem#splitStoreFile(org.apache.hadoop.hbase.HRegionInfo, String, - * StoreFile, byte[], boolean, RegionSplitPolicy)} we are not creating the split reference + * HStoreFile, byte[], boolean, RegionSplitPolicy)} we are not creating the split reference * if split row not lies in the StoreFile range. But in some use cases we may need to create * the split reference even when the split row not lies in the range. This method can be used * to decide, whether to skip the the StoreFile range check or not. http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java index bfe20ba..d64c372 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedMobStoreScanner.java @@ -23,9 +23,9 @@ import java.util.List; import java.util.NavigableSet; import org.apache.hadoop.hbase.Cell; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.yetus.audience.InterfaceAudience; /** * ReversedMobStoreScanner extends from ReversedStoreScanner, and is used to support @@ -40,7 +40,7 @@ public class ReversedMobStoreScanner extends ReversedStoreScanner { private boolean readEmptyValueOnMobCellMiss = false; protected final HMobStore mobStore; - ReversedMobStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, + ReversedMobStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, long readPt) throws IOException { super(store, scanInfo, scan, columns, readPt); cacheMobBlocks = MobUtils.isCacheMobBlocks(scan); http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 04e77e9..0089d3f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -22,12 +22,12 @@ import java.io.IOException; import java.util.List; import java.util.NavigableSet; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; +import org.apache.yetus.audience.InterfaceAudience; /** * ReversedStoreScanner extends from StoreScanner, and is used to support @@ -46,7 +46,7 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { * @param columns which columns we are scanning * @throws IOException */ - ReversedStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + ReversedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, long readPt) throws IOException { super(store, scanInfo, scan, columns, readPt);