http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index b7e83bf..6cece0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -28,8 +28,6 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; @@ -40,10 +38,10 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; /** * Interface for objects that hold a column family in a Region. Its a memstore and a set of zero or @@ -63,9 +61,9 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf // General Accessors CellComparator getComparator(); - Collection<StoreFile> getStorefiles(); + Collection<? extends StoreFile> getStorefiles(); - Collection<StoreFile> getCompactedFiles(); + Collection<? extends StoreFile> getCompactedFiles(); /** * Close all the readers We don't need to worry about subsequent requests because the Region @@ -73,7 +71,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @return the {@link StoreFile StoreFiles} that were previously being used. * @throws IOException on failure */ - Collection<StoreFile> close() throws IOException; + Collection<? extends StoreFile> close() throws IOException; /** * Return a scanner for both the memstore and the HStore files. Assumes we are not in a @@ -86,105 +84,6 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt) throws IOException; - /** - * Get all scanners with no filtering based on TTL (that happens further down the line). - * @param cacheBlocks cache the blocks or not - * @param usePread true to use pread, false if not - * @param isCompaction true if the scanner is created for compaction - * @param matcher the scan query matcher - * @param startRow the start row - * @param stopRow the stop row - * @param readPt the read point of the current scan - * @return all scanners for this store - */ - default List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean isGet, boolean usePread, - boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, byte[] stopRow, long readPt) - throws IOException { - return getScanners(cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, false, - readPt); - } - - /** - * Get all scanners with no filtering based on TTL (that happens further down the line). - * @param cacheBlocks cache the blocks or not - * @param usePread true to use pread, false if not - * @param isCompaction true if the scanner is created for compaction - * @param matcher the scan query matcher - * @param startRow the start row - * @param includeStartRow true to include start row, false if not - * @param stopRow the stop row - * @param includeStopRow true to include stop row, false if not - * @param readPt the read point of the current scan - * @return all scanners for this store - */ - List<KeyValueScanner> getScanners(boolean cacheBlocks, boolean usePread, boolean isCompaction, - ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, byte[] stopRow, - boolean includeStopRow, long readPt) throws IOException; - - /** - * Recreates the scanners on the current list of active store file scanners - * @param currentFileScanners the current set of active store file scanners - * @param cacheBlocks cache the blocks or not - * @param usePread use pread or not - * @param isCompaction is the scanner for compaction - * @param matcher the scan query matcher - * @param startRow the scan's start row - * @param includeStartRow should the scan include the start row - * @param stopRow the scan's stop row - * @param includeStopRow should the scan include the stop row - * @param readPt the read point of the current scane - * @param includeMemstoreScanner whether the current scanner should include memstorescanner - * @return list of scanners recreated on the current Scanners - * @throws IOException - */ - List<KeyValueScanner> recreateScanners(List<KeyValueScanner> currentFileScanners, - boolean cacheBlocks, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, - byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow, long readPt, - boolean includeMemstoreScanner) throws IOException; - - /** - * Create scanners on the given files and if needed on the memstore with no filtering based on TTL - * (that happens further down the line). - * @param files the list of files on which the scanners has to be created - * @param cacheBlocks cache the blocks or not - * @param usePread true to use pread, false if not - * @param isCompaction true if the scanner is created for compaction - * @param matcher the scan query matcher - * @param startRow the start row - * @param stopRow the stop row - * @param readPt the read point of the current scan - * @param includeMemstoreScanner true if memstore has to be included - * @return scanners on the given files and on the memstore if specified - */ - default List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks, - boolean isGet, boolean usePread, boolean isCompaction, ScanQueryMatcher matcher, - byte[] startRow, byte[] stopRow, long readPt, boolean includeMemstoreScanner) - throws IOException { - return getScanners(files, cacheBlocks, usePread, isCompaction, matcher, startRow, true, stopRow, - false, readPt, includeMemstoreScanner); - } - - /** - * Create scanners on the given files and if needed on the memstore with no filtering based on TTL - * (that happens further down the line). - * @param files the list of files on which the scanners has to be created - * @param cacheBlocks ache the blocks or not - * @param usePread true to use pread, false if not - * @param isCompaction true if the scanner is created for compaction - * @param matcher the scan query matcher - * @param startRow the start row - * @param includeStartRow true to include start row, false if not - * @param stopRow the stop row - * @param includeStopRow true to include stop row, false if not - * @param readPt the read point of the current scan - * @param includeMemstoreScanner true if memstore has to be included - * @return scanners on the given files and on the memstore if specified - */ - List<KeyValueScanner> getScanners(List<StoreFile> files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean includeStartRow, - byte[] stopRow, boolean includeStopRow, long readPt, boolean includeMemstoreScanner) - throws IOException; - ScanInfo getScanInfo(); /** @@ -194,7 +93,6 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf FileSystem getFileSystem(); - /** * @param maxKeyCount * @param compression Compression algorithm to use @@ -269,10 +167,10 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @deprecated see compact(CompactionContext, ThroughputController, User) */ @Deprecated - List<StoreFile> compact(CompactionContext compaction, + List<? extends StoreFile> compact(CompactionContext compaction, ThroughputController throughputController) throws IOException; - List<StoreFile> compact(CompactionContext compaction, + List<? extends StoreFile> compact(CompactionContext compaction, ThroughputController throughputController, User user) throws IOException; /** @@ -297,10 +195,9 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf boolean canSplit(); /** - * Determines if Store should be split - * @return byte[] if store should be split, null otherwise. + * Determines if Store should be split. */ - byte[] getSplitPoint(); + Optional<byte[]> getSplitPoint(); // General accessors into the state of the store // TODO abstract some of this out into a metrics class
http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java index fb5f0e4..60b3c3d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java @@ -22,13 +22,13 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.List; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.yetus.audience.InterfaceAudience; /** * StoreEngine is a factory that can create the objects necessary for HStore to operate. @@ -84,7 +84,7 @@ public abstract class StoreEngine<SF extends StoreFlusher, * @param filesCompacting Files currently compacting * @return whether a compaction selection is possible */ - public abstract boolean needsCompaction(List<StoreFile> filesCompacting); + public abstract boolean needsCompaction(List<HStoreFile> filesCompacting); /** * Creates an instance of a compaction context specific to this engine. @@ -97,13 +97,13 @@ public abstract class StoreEngine<SF extends StoreFlusher, * Create the StoreEngine's components. */ protected abstract void createComponents( - Configuration conf, Store store, CellComparator kvComparator) throws IOException; + Configuration conf, HStore store, CellComparator cellComparator) throws IOException; private void createComponentsOnce( - Configuration conf, Store store, CellComparator kvComparator) throws IOException { + Configuration conf, HStore store, CellComparator cellComparator) throws IOException { assert compactor == null && compactionPolicy == null && storeFileManager == null && storeFlusher == null; - createComponents(conf, store, kvComparator); + createComponents(conf, store, cellComparator); assert compactor != null && compactionPolicy != null && storeFileManager != null && storeFlusher != null; } @@ -113,16 +113,16 @@ public abstract class StoreEngine<SF extends StoreFlusher, * @param store The store. An unfortunate dependency needed due to it * being passed to coprocessors via the compactor. * @param conf Store configuration. - * @param kvComparator KVComparator for storeFileManager. + * @param cellComparator CellComparator for storeFileManager. * @return StoreEngine to use. */ public static StoreEngine<?, ?, ?, ?> create( - Store store, Configuration conf, CellComparator kvComparator) throws IOException { + HStore store, Configuration conf, CellComparator cellComparator) throws IOException { String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName()); try { StoreEngine<?,?,?,?> se = ReflectionUtils.instantiateWithCustomCtor( className, new Class[] { }, new Object[] { }); - se.createComponentsOnce(conf, store, kvComparator); + se.createComponentsOnce(conf, store, cellComparator); return se; } catch (Exception e) { throw new IOException("Unable to load configured store engine '" + className + "'", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index d5e51ed..0097bd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.Comparator; +import java.util.Optional; import java.util.OptionalLong; import org.apache.hadoop.fs.Path; @@ -27,73 +28,38 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.util.Bytes; /** * An interface to describe a store data file. + * <p> + * <strong>NOTICE: </strong>this interface is mainly designed for coprocessor, so it will not expose + * all the internal APIs for a 'store file'. If you are implementing something inside HBase, i.e, + * not a coprocessor hook, usually you should use {@link HStoreFile} directly as it is the only + * implementation of this interface. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving public interface StoreFile { - static final String STORE_FILE_READER_NO_READAHEAD = "hbase.store.reader.no-readahead"; - - // Keys for fileinfo values in HFile - - /** Max Sequence ID in FileInfo */ - static final byte[] MAX_SEQ_ID_KEY = Bytes.toBytes("MAX_SEQ_ID_KEY"); - - /** Major compaction flag in FileInfo */ - static final byte[] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); - - /** Minor compaction flag in FileInfo */ - static final byte[] EXCLUDE_FROM_MINOR_COMPACTION_KEY = - Bytes.toBytes("EXCLUDE_FROM_MINOR_COMPACTION"); - - /** Bloom filter Type in FileInfo */ - static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); - - /** Delete Family Count in FileInfo */ - static final byte[] DELETE_FAMILY_COUNT = Bytes.toBytes("DELETE_FAMILY_COUNT"); - - /** Last Bloom filter key in FileInfo */ - static final byte[] LAST_BLOOM_KEY = Bytes.toBytes("LAST_BLOOM_KEY"); - - /** Key for Timerange information in metadata */ - static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); - - /** Key for timestamp of earliest-put in metadata */ - static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS"); - - /** Key for the number of mob cells in metadata */ - static final byte[] MOB_CELLS_COUNT = Bytes.toBytes("MOB_CELLS_COUNT"); - - /** Meta key set when store file is a result of a bulk load */ - static final byte[] BULKLOAD_TASK_KEY = Bytes.toBytes("BULKLOAD_SOURCE_TASK"); - static final byte[] BULKLOAD_TIME_KEY = Bytes.toBytes("BULKLOAD_TIMESTAMP"); - /** - * Key for skipping resetting sequence id in metadata. For bulk loaded hfiles, the scanner resets - * the cell seqId with the latest one, if this metadata is set as true, the reset is skipped. + * Get the first key in this store file. */ - static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID"); - - CacheConfig getCacheConf(); + Optional<Cell> getFirstKey(); - Cell getFirstKey(); - - Cell getLastKey(); + /** + * Get the last key in this store file. + */ + Optional<Cell> getLastKey(); + /** + * Get the comparator for comparing two cells. + */ Comparator<Cell> getComparator(); - long getMaxMemstoreTS(); - /** - * @return the StoreFile object associated to this StoreFile. null if the StoreFile is not a - * reference. + * Get max of the MemstoreTS in the KV's in this store file. */ - StoreFileInfo getFileInfo(); + long getMaxMemstoreTS(); /** * @return Path or null if this StoreFile was made with a Stream. @@ -130,14 +96,11 @@ public interface StoreFile { */ long getMaxSequenceId(); - long getModificationTimeStamp() throws IOException; - /** - * Only used by the Striped Compaction Policy - * @param key - * @return value associated with the metadata key + * Get the modification time of this store file. Usually will access the file system so throws + * IOException. */ - byte[] getMetadataValue(byte[] key); + long getModificationTimeStamp() throws IOException; /** * Check if this storefile was created by bulk load. When a hfile is bulk loaded into HBase, we @@ -149,13 +112,6 @@ public interface StoreFile { */ boolean isBulkLoadResult(); - boolean isCompactedAway(); - - /** - * @return true if the file is still used in reads - */ - boolean isReferencedInReads(); - /** * Return the timestamp at which this bulk load file was generated. */ @@ -168,49 +124,17 @@ public interface StoreFile { HDFSBlocksDistribution getHDFSBlockDistribution(); /** - * Initialize the reader used for pread. - */ - void initReader() throws IOException; - - /** - * Must be called after initReader. - */ - StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, - boolean canOptimizeForNonNullColumn); - - StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, - boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) - throws IOException; - - /** - * @return Current reader. Must call initReader first else returns null. - * @see #initReader() - */ - StoreFileReader getReader(); - - /** - * @param evictOnClose whether to evict blocks belonging to this file - * @throws IOException - */ - void closeReader(boolean evictOnClose) throws IOException; - - /** - * Marks the status of the file as compactedAway. + * @return a length description of this StoreFile, suitable for debug output */ - void markCompactedAway(); + String toStringDetailed(); /** - * Delete this file - * @throws IOException + * Get the min timestamp of all the cells in the store file. */ - void deleteReader() throws IOException; + OptionalLong getMinimumTimestamp(); /** - * @return a length description of this StoreFile, suitable for debug output + * Get the max timestamp of all the cells in the store file. */ - String toStringDetailed(); - - OptionalLong getMinimumTimestamp(); - OptionalLong getMaximumTimestamp(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileComparators.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileComparators.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileComparators.java index e8ec9fd..cd265e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileComparators.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileComparators.java @@ -26,17 +26,17 @@ import java.util.Comparator; import org.apache.yetus.audience.InterfaceAudience; /** - * Useful comparators for comparing StoreFiles. + * Useful comparators for comparing store files. */ @InterfaceAudience.Private final class StoreFileComparators { /** - * Comparator that compares based on the Sequence Ids of the the StoreFiles. Bulk loads that did + * Comparator that compares based on the Sequence Ids of the the store files. Bulk loads that did * not request a seq ID are given a seq id of -1; thus, they are placed before all non- bulk * loads, and bulk loads with sequence Id. Among these files, the size is used to determine the * ordering, then bulkLoadTime. If there are ties, the path name is used as a tie-breaker. */ - public static final Comparator<StoreFile> SEQ_ID = + public static final Comparator<HStoreFile> SEQ_ID = Ordering.compound(ImmutableList.of(Ordering.natural().onResultOf(new GetSeqId()), Ordering.natural().onResultOf(new GetFileSize()).reverse(), Ordering.natural().onResultOf(new GetBulkTime()), @@ -46,23 +46,23 @@ final class StoreFileComparators { * Comparator for time-aware compaction. SeqId is still the first ordering criterion to maintain * MVCC. */ - public static final Comparator<StoreFile> SEQ_ID_MAX_TIMESTAMP = + public static final Comparator<HStoreFile> SEQ_ID_MAX_TIMESTAMP = Ordering.compound(ImmutableList.of(Ordering.natural().onResultOf(new GetSeqId()), Ordering.natural().onResultOf(new GetMaxTimestamp()), Ordering.natural().onResultOf(new GetFileSize()).reverse(), Ordering.natural().onResultOf(new GetBulkTime()), Ordering.natural().onResultOf(new GetPathName()))); - private static class GetSeqId implements Function<StoreFile, Long> { + private static class GetSeqId implements Function<HStoreFile, Long> { @Override - public Long apply(StoreFile sf) { + public Long apply(HStoreFile sf) { return sf.getMaxSequenceId(); } } - private static class GetFileSize implements Function<StoreFile, Long> { + private static class GetFileSize implements Function<HStoreFile, Long> { @Override - public Long apply(StoreFile sf) { + public Long apply(HStoreFile sf) { if (sf.getReader() != null) { return sf.getReader().length(); } else { @@ -73,23 +73,23 @@ final class StoreFileComparators { } } - private static class GetBulkTime implements Function<StoreFile, Long> { + private static class GetBulkTime implements Function<HStoreFile, Long> { @Override - public Long apply(StoreFile sf) { + public Long apply(HStoreFile sf) { return sf.getBulkLoadTimestamp().orElse(Long.MAX_VALUE); } } - private static class GetPathName implements Function<StoreFile, String> { + private static class GetPathName implements Function<HStoreFile, String> { @Override - public String apply(StoreFile sf) { + public String apply(HStoreFile sf) { return sf.getPath().getName(); } } - private static class GetMaxTimestamp implements Function<StoreFile, Long> { + private static class GetMaxTimestamp implements Function<HStoreFile, Long> { @Override - public Long apply(StoreFile sf) { + public Long apply(HStoreFile sf) { return sf.getMaximumTimestamp().orElse(Long.MAX_VALUE); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index c774080..67ef4de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -23,13 +23,14 @@ import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.List; - -import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableCollection; +import java.util.Optional; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableCollection; + /** * Manages the store files and basic metadata about that that determines the logical structure * (e.g. what files to return for scan, how to determine split point, and such). @@ -45,13 +46,13 @@ public interface StoreFileManager { * Loads the initial store files into empty StoreFileManager. * @param storeFiles The files to load. */ - void loadFiles(List<StoreFile> storeFiles); + void loadFiles(List<HStoreFile> storeFiles); /** * Adds new files, either for from MemStore flush or bulk insert, into the structure. * @param sfs New store files. */ - void insertNewFiles(Collection<StoreFile> sfs) throws IOException; + void insertNewFiles(Collection<HStoreFile> sfs) throws IOException; /** * Adds only the new compaction results into the structure. @@ -59,34 +60,34 @@ public interface StoreFileManager { * @param results The resulting files for the compaction. */ void addCompactionResults( - Collection<StoreFile> compactedFiles, Collection<StoreFile> results) throws IOException; + Collection<HStoreFile> compactedFiles, Collection<HStoreFile> results) throws IOException; /** * Remove the compacted files * @param compactedFiles the list of compacted files * @throws IOException */ - void removeCompactedFiles(Collection<StoreFile> compactedFiles) throws IOException; + void removeCompactedFiles(Collection<HStoreFile> compactedFiles) throws IOException; /** * Clears all the files currently in use and returns them. * @return The files previously in use. */ - ImmutableCollection<StoreFile> clearFiles(); + ImmutableCollection<HStoreFile> clearFiles(); /** * Clears all the compacted files and returns them. This method is expected to be * accessed single threaded. * @return The files compacted previously. */ - Collection<StoreFile> clearCompactedFiles(); + Collection<HStoreFile> clearCompactedFiles(); /** * Gets the snapshot of the store files currently in use. Can be used for things like metrics * and checks; should not assume anything about relations between store files in the list. * @return The list of StoreFiles. */ - Collection<StoreFile> getStorefiles(); + Collection<HStoreFile> getStorefiles(); /** * List of compacted files inside this store that needs to be excluded in reads @@ -95,7 +96,7 @@ public interface StoreFileManager { * compacted files are done. * @return the list of compacted files */ - Collection<StoreFile> getCompactedfiles(); + Collection<HStoreFile> getCompactedfiles(); /** * Returns the number of files currently in use. @@ -115,7 +116,7 @@ public interface StoreFileManager { * @param stopRow Stop row of the request. * @return The list of files that are to be read for this request. */ - Collection<StoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow, byte[] stopRow, + Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow, byte[] stopRow, boolean includeStopRow); /** @@ -124,9 +125,7 @@ public interface StoreFileManager { * @return The files that may have the key less than or equal to targetKey, in reverse * order of new-ness, and preference for target key. */ - Iterator<StoreFile> getCandidateFilesForRowKeyBefore( - KeyValue targetKey - ); + Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(KeyValue targetKey); /** * Updates the candidate list for finding row key before. Based on the list of candidates @@ -139,17 +138,16 @@ public interface StoreFileManager { * @param candidate The current best candidate found. * @return The list to replace candidateFiles. */ - Iterator<StoreFile> updateCandidateFilesForRowKeyBefore( - Iterator<StoreFile> candidateFiles, KeyValue targetKey, Cell candidate - ); + Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore(Iterator<HStoreFile> candidateFiles, + KeyValue targetKey, Cell candidate); /** * Gets the split point for the split of this set of store files (approx. middle). - * @return The mid-point, or null if no split is possible. + * @return The mid-point if possible. * @throws IOException */ - byte[] getSplitPoint() throws IOException; + Optional<byte[]> getSplitPoint() throws IOException; /** * @return The store compaction priority. @@ -161,7 +159,7 @@ public interface StoreFileManager { * @param filesCompacting Files that are currently compacting. * @return The files which don't have any necessary data according to TTL and other criteria. */ - Collection<StoreFile> getUnneededFiles(long maxTs, List<StoreFile> filesCompacting); + Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting); /** * @return the compaction pressure used for compaction throughput tuning. @@ -171,7 +169,7 @@ public interface StoreFileManager { /** * @return the comparator used to sort storefiles. Usually, the - * {@link StoreFile#getMaxSequenceId()} is the first priority. + * {@link HStoreFile#getMaxSequenceId()} is the first priority. */ - Comparator<StoreFile> getStoreFileComparator(); + Comparator<HStoreFile> getStoreFileComparator(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index e3f97a2..67b8fbd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,9 +17,14 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.LAST_BLOOM_KEY; + import java.io.DataInput; import java.io.IOException; import java.util.Map; +import java.util.Optional; import java.util.SortedSet; import java.util.concurrent.atomic.AtomicInteger; @@ -414,40 +418,40 @@ public class StoreFileReader { * @return true if there is overlap, false otherwise */ public boolean passesKeyRangeFilter(Scan scan) { - if (this.getFirstKey() == null || this.getLastKey() == null) { + Optional<Cell> firstKeyKV = this.getFirstKey(); + Optional<Cell> lastKeyKV = this.getLastKey(); + if (!firstKeyKV.isPresent() || !lastKeyKV.isPresent()) { // the file is empty return false; } - if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW) - && Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { + if (Bytes.equals(scan.getStartRow(), HConstants.EMPTY_START_ROW) && + Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) { return true; } byte[] smallestScanRow = scan.isReversed() ? scan.getStopRow() : scan.getStartRow(); byte[] largestScanRow = scan.isReversed() ? scan.getStartRow() : scan.getStopRow(); - Cell firstKeyKV = this.getFirstKey(); - Cell lastKeyKV = this.getLastKey(); - boolean nonOverLapping = (getComparator().compareRows(firstKeyKV, - largestScanRow, 0, largestScanRow.length) > 0 - && !Bytes - .equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(), - HConstants.EMPTY_END_ROW)) - || getComparator().compareRows(lastKeyKV, smallestScanRow, 0, smallestScanRow.length) < 0; + boolean nonOverLapping = (getComparator() + .compareRows(firstKeyKV.get(), largestScanRow, 0, largestScanRow.length) > 0 && + !Bytes.equals(scan.isReversed() ? scan.getStartRow() : scan.getStopRow(), + HConstants.EMPTY_END_ROW)) || + getComparator().compareRows(lastKeyKV.get(), smallestScanRow, 0, + smallestScanRow.length) < 0; return !nonOverLapping; } public Map<byte[], byte[]> loadFileInfo() throws IOException { Map<byte [], byte []> fi = reader.loadFileInfo(); - byte[] b = fi.get(StoreFile.BLOOM_FILTER_TYPE_KEY); + byte[] b = fi.get(BLOOM_FILTER_TYPE_KEY); if (b != null) { bloomFilterType = BloomType.valueOf(Bytes.toString(b)); } - lastBloomKey = fi.get(StoreFile.LAST_BLOOM_KEY); + lastBloomKey = fi.get(LAST_BLOOM_KEY); if(bloomFilterType == BloomType.ROWCOL) { lastBloomKeyOnlyKV = new KeyValue.KeyOnlyKeyValue(lastBloomKey, 0, lastBloomKey.length); } - byte[] cnt = fi.get(StoreFile.DELETE_FAMILY_COUNT); + byte[] cnt = fi.get(DELETE_FAMILY_COUNT); if (cnt != null) { deleteFamilyCnt = Bytes.toLong(cnt); } @@ -537,16 +541,16 @@ public class StoreFileReader { this.deleteFamilyBloomFilter = null; } - public Cell getLastKey() { + public Optional<Cell> getLastKey() { return reader.getLastKey(); } - public byte[] getLastRowKey() { + public Optional<byte[]> getLastRowKey() { return reader.getLastRowKey(); } - public Cell midkey() throws IOException { - return reader.midkey(); + public Optional<Cell> midKey() throws IOException { + return reader.midKey(); } public long length() { @@ -565,7 +569,7 @@ public class StoreFileReader { return deleteFamilyCnt; } - public Cell getFirstKey() { + public Optional<Cell> getFirstKey() { return reader.getFirstKey(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 08111dc..f21b30b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.PriorityQueue; import java.util.concurrent.atomic.LongAdder; @@ -102,7 +103,7 @@ public class StoreFileScanner implements KeyValueScanner { /** * Return an array of scanners corresponding to the given set of store files. */ - public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files, + public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, boolean cacheBlocks, boolean usePread, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt); } @@ -110,7 +111,7 @@ public class StoreFileScanner implements KeyValueScanner { /** * Return an array of scanners corresponding to the given set of store files. */ - public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files, + public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null, @@ -121,7 +122,7 @@ public class StoreFileScanner implements KeyValueScanner { * Return an array of scanners corresponding to the given set of store files, And set the * ScanQueryMatcher for each store file scanner for further optimization */ - public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files, + public static List<StoreFileScanner> getScannersForStoreFiles(Collection<HStoreFile> files, boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean canUseDrop, ScanQueryMatcher matcher, long readPt) throws IOException { if (files.isEmpty()) { @@ -129,15 +130,15 @@ public class StoreFileScanner implements KeyValueScanner { } List<StoreFileScanner> scanners = new ArrayList<>(files.size()); boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false; - PriorityQueue<StoreFile> sortedFiles = + PriorityQueue<HStoreFile> sortedFiles = new PriorityQueue<>(files.size(), StoreFileComparators.SEQ_ID); - for (StoreFile file : files) { + for (HStoreFile file : files) { // The sort function needs metadata so we need to open reader first before sorting the list. file.initReader(); sortedFiles.add(file); } for (int i = 0, n = files.size(); i < n; i++) { - StoreFile sf = sortedFiles.remove(); + HStoreFile sf = sortedFiles.remove(); StoreFileScanner scanner; if (usePread) { scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn); @@ -154,10 +155,10 @@ public class StoreFileScanner implements KeyValueScanner { * Get scanners for compaction. We will create a separated reader for each store file to avoid * contention with normal read request. */ - public static List<StoreFileScanner> getScannersForCompaction(Collection<StoreFile> files, + public static List<StoreFileScanner> getScannersForCompaction(Collection<HStoreFile> files, boolean canUseDropBehind, long readPt) throws IOException { List<StoreFileScanner> scanners = new ArrayList<>(files.size()); - List<StoreFile> sortedFiles = new ArrayList<>(files); + List<HStoreFile> sortedFiles = new ArrayList<>(files); Collections.sort(sortedFiles, StoreFileComparators.SEQ_ID); boolean succ = false; try { @@ -537,12 +538,11 @@ public class StoreFileScanner implements KeyValueScanner { @Override public boolean seekToLastRow() throws IOException { - byte[] lastRow = reader.getLastRowKey(); - if (lastRow == null) { + Optional<byte[]> lastRow = reader.getLastRowKey(); + if (!lastRow.isPresent()) { return false; } - Cell seekKey = CellUtil - .createFirstOnRow(lastRow, 0, (short) lastRow.length); + Cell seekKey = CellUtil.createFirstOnRow(lastRow.get()); if (seek(seekKey)) { return true; } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java index 25b9aa1..4dbe280 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,7 +17,13 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.DELETE_FAMILY_COUNT; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAX_SEQ_ID_KEY; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.MOB_CELLS_COUNT; +import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; import java.io.IOException; import java.net.InetSocketAddress; @@ -33,10 +38,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; @@ -48,6 +52,9 @@ import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.RowBloomContext; import org.apache.hadoop.hbase.util.RowColBloomContext; import org.apache.hadoop.io.WritableUtils; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; /** * A StoreFile writer. Use this to read/write HBase Store Files. It is package @@ -185,10 +192,9 @@ public class StoreFileWriter implements CellSink, ShipperListener { * @throws IOException problem writing to FS */ public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) - throws IOException { - writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); - writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, - Bytes.toBytes(majorCompaction)); + throws IOException { + writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); + writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); appendTrackedTimestampsToMetadata(); } @@ -202,9 +208,9 @@ public class StoreFileWriter implements CellSink, ShipperListener { */ public void appendMetadata(final long maxSequenceId, final boolean majorCompaction, final long mobCellsCount) throws IOException { - writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); - writer.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); - writer.appendFileInfo(StoreFile.MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); + writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); + writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); + writer.appendFileInfo(MOB_CELLS_COUNT, Bytes.toBytes(mobCellsCount)); appendTrackedTimestampsToMetadata(); } @@ -212,8 +218,8 @@ public class StoreFileWriter implements CellSink, ShipperListener { * Add TimestampRange and earliest put timestamp to Metadata */ public void appendTrackedTimestampsToMetadata() throws IOException { - appendFileInfo(StoreFile.TIMERANGE_KEY, WritableUtils.toByteArray(timeRangeTracker)); - appendFileInfo(StoreFile.EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); + appendFileInfo(TIMERANGE_KEY, WritableUtils.toByteArray(timeRangeTracker)); + appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs)); } /** @@ -310,8 +316,7 @@ public class StoreFileWriter implements CellSink, ShipperListener { // add the general Bloom filter writer and append file info if (hasGeneralBloom) { writer.addGeneralBloomFilter(generalBloomFilterWriter); - writer.appendFileInfo(StoreFile.BLOOM_FILTER_TYPE_KEY, - Bytes.toBytes(bloomType.toString())); + writer.appendFileInfo(BLOOM_FILTER_TYPE_KEY, Bytes.toBytes(bloomType.toString())); bloomContext.addLastBloomKey(writer); } return hasGeneralBloom; @@ -327,8 +332,7 @@ public class StoreFileWriter implements CellSink, ShipperListener { // append file info about the number of delete family kvs // even if there is no delete family Bloom. - writer.appendFileInfo(StoreFile.DELETE_FAMILY_COUNT, - Bytes.toBytes(this.deleteFamilyCnt)); + writer.appendFileInfo(DELETE_FAMILY_COUNT, Bytes.toBytes(this.deleteFamilyCnt)); return hasDeleteFamilyBloom; } @@ -501,7 +505,7 @@ public class StoreFileWriter implements CellSink, ShipperListener { } // set block storage policy for temp path - String policyName = this.conf.get(HColumnDescriptor.STORAGE_POLICY); + String policyName = this.conf.get(ColumnFamilyDescriptorBuilder.STORAGE_POLICY); if (null == policyName) { policyName = this.conf.get(HStore.BLOCK_STORAGE_POLICY_KEY); } http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index f670ade..bc5a7cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -29,10 +29,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.yetus.audience.InterfaceAudience; /** * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). @@ -41,9 +41,9 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; @InterfaceAudience.Private abstract class StoreFlusher { protected Configuration conf; - protected Store store; + protected HStore store; - public StoreFlusher(Configuration conf, Store store) { + public StoreFlusher(Configuration conf, HStore store) { this.conf = conf; this.store = store; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 3a98479..dd68d28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -16,7 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.regionserver; import java.io.IOException; @@ -38,7 +37,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.IsolationLevel; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.executor.ExecutorService; @@ -51,6 +49,7 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; @@ -67,7 +66,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver { private static final Log LOG = LogFactory.getLog(StoreScanner.class); // In unit tests, the store could be null - protected final Optional<Store> store; + protected final Optional<HStore> store; private ScanQueryMatcher matcher; protected KeyValueHeap heap; private boolean cacheBlocks; @@ -147,7 +146,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Indicates whether there was flush during the course of the scan private volatile boolean flushed = false; // generally we get one file from a flush - private final List<StoreFile> flushedStoreFiles = new ArrayList<>(1); + private final List<HStoreFile> flushedStoreFiles = new ArrayList<>(1); // Since CompactingMemstore is now default, we get three memstore scanners from a flush private final List<KeyValueScanner> memStoreScannersAfterFlush = new ArrayList<>(3); // The current list of scanners @@ -160,7 +159,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private boolean topChanged = false; /** An internal constructor. */ - private StoreScanner(Optional<Store> store, Scan scan, ScanInfo scanInfo, + private StoreScanner(Optional<HStore> store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) { this.readPt = readPt; this.store = store; @@ -223,7 +222,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @param columns which columns we are scanning * @throws IOException */ - public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, + public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, long readPt) throws IOException { this(Optional.of(store), scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(), ScanType.USER_SCAN); @@ -275,7 +274,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @param scanners ancillary scanners * @param smallestReadPoint the readPoint that we should use for tracking versions */ - public StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions, + public StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions, List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { this(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs, null, @@ -292,20 +291,20 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. */ - public StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions, + public StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions, List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { this(store, scanInfo, maxVersions, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); } - private StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions, + private StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions, List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { this(Optional.of(store), maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt()) : SCAN_FOR_COMPACTION, - scanInfo, 0, ((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), + scanInfo, 0, store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); assert scanType != ScanType.USER_SCAN; matcher = @@ -844,9 +843,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Implementation of ChangedReadersObserver @Override - public void updateReaders(List<StoreFile> sfs, List<KeyValueScanner> memStoreScanners) throws IOException { - if (CollectionUtils.isEmpty(sfs) - && CollectionUtils.isEmpty(memStoreScanners)) { + public void updateReaders(List<HStoreFile> sfs, List<KeyValueScanner> memStoreScanners) + throws IOException { + if (CollectionUtils.isEmpty(sfs) && CollectionUtils.isEmpty(memStoreScanners)) { return; } flushLock.lock(); @@ -868,7 +867,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ protected final boolean reopenAfterFlush() throws IOException { // here we can make sure that we have a Store instance. - Store store = this.store.get(); + HStore store = this.store.get(); Cell lastTop = heap.peek(); // When we have the scan object, should we not pass it to getScanners() to get a limited set of // scanners? We did so in the constructor and we could have done it now by storing the scan @@ -996,7 +995,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List<KeyValueScanner> newCurrentScanners; KeyValueHeap newHeap; // We must have a store instance here - Store store = this.store.get(); + HStore store = this.store.get(); try { // recreate the scanners on the current file scanners fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java index 9104546..0abaffd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java @@ -41,7 +41,7 @@ public class StoreUtils { /** * Creates a deterministic hash code for store file collection. */ - public static OptionalInt getDeterministicRandomSeed(Collection<StoreFile> files) { + public static OptionalInt getDeterministicRandomSeed(Collection<HStoreFile> files) { return files.stream().mapToInt(f -> f.getPath().getName().hashCode()).findFirst(); } @@ -49,24 +49,17 @@ public class StoreUtils { * Determines whether any files in the collection are references. * @param files The files. */ - public static boolean hasReferences(final Collection<StoreFile> files) { - if (files != null) { - for (StoreFile hsf: files) { - if (hsf.isReference()) { - return true; - } - } - } - return false; + public static boolean hasReferences(Collection<HStoreFile> files) { + // TODO: make sure that we won't pass null here in the future. + return files != null ? files.stream().anyMatch(HStoreFile::isReference) : false; } /** * Gets lowest timestamp from candidate StoreFiles */ - public static long getLowestTimestamp(final Collection<StoreFile> candidates) - throws IOException { + public static long getLowestTimestamp(Collection<HStoreFile> candidates) throws IOException { long minTs = Long.MAX_VALUE; - for (StoreFile storeFile : candidates) { + for (HStoreFile storeFile : candidates) { minTs = Math.min(minTs, storeFile.getModificationTimeStamp()); } return minTs; @@ -77,7 +70,7 @@ public class StoreUtils { * @param candidates The files to choose from. * @return The largest file; null if no file has a reader. */ - static Optional<StoreFile> getLargestFile(Collection<StoreFile> candidates) { + static Optional<HStoreFile> getLargestFile(Collection<HStoreFile> candidates) { return candidates.stream().filter(f -> f.getReader() != null) .max((f1, f2) -> Long.compare(f1.getReader().length(), f2.getReader().length())); } @@ -89,29 +82,19 @@ public class StoreUtils { * @return 0 if no non-bulk-load files are provided or, this is Store that does not yet have any * store files. */ - public static long getMaxMemstoreTSInList(Collection<StoreFile> sfs) { - long max = 0; - for (StoreFile sf : sfs) { - if (!sf.isBulkLoadResult()) { - max = Math.max(max, sf.getMaxMemstoreTS()); - } - } - return max; + public static long getMaxMemstoreTSInList(Collection<HStoreFile> sfs) { + return sfs.stream().filter(sf -> !sf.isBulkLoadResult()).mapToLong(HStoreFile::getMaxMemstoreTS) + .max().orElse(0L); } /** - * Return the highest sequence ID found across all storefiles in - * the given list. + * Return the highest sequence ID found across all storefiles in the given list. * @param sfs - * @return 0 if no non-bulk-load files are provided or, this is Store that - * does not yet have any store files. + * @return 0 if no non-bulk-load files are provided or, this is Store that does not yet have any + * store files. */ - public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) { - long max = 0; - for (StoreFile sf : sfs) { - max = Math.max(max, sf.getMaxSequenceId()); - } - return max; + public static long getMaxSequenceIdInList(Collection<HStoreFile> sfs) { + return sfs.stream().mapToLong(HStoreFile::getMaxSequenceId).max().orElse(0L); } /** @@ -120,7 +103,7 @@ public class StoreUtils { * @param comparator Comparator used to compare KVs. * @return The split point row, or null if splitting is not possible, or reader is null. */ - static Optional<byte[]> getFileSplitPoint(StoreFile file, CellComparator comparator) + static Optional<byte[]> getFileSplitPoint(HStoreFile file, CellComparator comparator) throws IOException { StoreFileReader reader = file.getReader(); if (reader == null) { @@ -130,20 +113,31 @@ public class StoreUtils { // Get first, last, and mid keys. Midkey is the key that starts block // in middle of hfile. Has column and timestamp. Need to return just // the row we want to split on as midkey. - Cell midkey = reader.midkey(); - if (midkey != null) { - Cell firstKey = reader.getFirstKey(); - Cell lastKey = reader.getLastKey(); - // if the midkey is the same as the first or last keys, we cannot (ever) split this region. - if (comparator.compareRows(midkey, firstKey) == 0 || - comparator.compareRows(midkey, lastKey) == 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("cannot split because midkey is the same as first or last row"); - } - return Optional.empty(); + Optional<Cell> optionalMidKey = reader.midKey(); + if (!optionalMidKey.isPresent()) { + return Optional.empty(); + } + Cell midKey = optionalMidKey.get(); + Cell firstKey = reader.getFirstKey().get(); + Cell lastKey = reader.getLastKey().get(); + // if the midkey is the same as the first or last keys, we cannot (ever) split this region. + if (comparator.compareRows(midKey, firstKey) == 0 || + comparator.compareRows(midKey, lastKey) == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("cannot split because midkey is the same as first or last row"); } - return Optional.of(CellUtil.cloneRow(midkey)); + return Optional.empty(); } - return Optional.empty(); + return Optional.of(CellUtil.cloneRow(midKey)); + } + + /** + * Gets the mid point of the largest file passed in as split point. + */ + static Optional<byte[]> getSplitPoint(Collection<HStoreFile> storefiles, + CellComparator comparator) throws IOException { + Optional<HStoreFile> largestFile = StoreUtils.getLargestFile(storefiles); + return largestFile.isPresent() ? StoreUtils.getFileSplitPoint(largestFile.get(), comparator) + : Optional.empty(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a5f84430/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index b14b0d0..39f142f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -47,7 +47,7 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher, private StripeStoreConfig config; @Override - public boolean needsCompaction(List<StoreFile> filesCompacting) { + public boolean needsCompaction(List<HStoreFile> filesCompacting) { return this.compactionPolicy.needsCompactions(this.storeFileManager, filesCompacting); } @@ -58,7 +58,7 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher, @Override protected void createComponents( - Configuration conf, Store store, CellComparator comparator) throws IOException { + Configuration conf, HStore store, CellComparator comparator) throws IOException { this.config = new StripeStoreConfig(conf, store); this.compactionPolicy = new StripeCompactionPolicy(conf, store, config); this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config); @@ -74,12 +74,12 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher, private StripeCompactionPolicy.StripeCompactionRequest stripeRequest = null; @Override - public List<StoreFile> preSelect(List<StoreFile> filesCompacting) { + public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) { return compactionPolicy.preSelectFilesForCoprocessor(storeFileManager, filesCompacting); } @Override - public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction, + public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction, boolean mayUseOffPeak, boolean forceMajor) throws IOException { this.stripeRequest = compactionPolicy.selectCompaction( storeFileManager, filesCompacting, mayUseOffPeak);