HBASE-16324 Remove LegacyScanQueryMatcher
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8d33949b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8d33949b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8d33949b Branch: refs/heads/master Commit: 8d33949b8db072902783f63cd9aaa68cbd6b905f Parents: 2773510 Author: zhangduo <zhang...@apache.org> Authored: Fri Aug 25 17:02:03 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Sat Aug 26 08:04:43 2017 +0800 ---------------------------------------------------------------------- .../example/ZooKeeperScanPolicyObserver.java | 10 +- .../hbase/mob/DefaultMobStoreCompactor.java | 6 +- .../compactions/PartitionedMobCompactor.java | 8 +- .../hadoop/hbase/regionserver/HMobStore.java | 1 - .../MemStoreCompactorSegmentsIterator.java | 26 +- .../regionserver/ReversedStoreScanner.java | 8 +- .../hadoop/hbase/regionserver/StoreFlusher.java | 10 +- .../hadoop/hbase/regionserver/StoreScanner.java | 217 +++--- .../regionserver/compactions/Compactor.java | 18 +- .../querymatcher/LegacyScanQueryMatcher.java | 384 ----------- ...estAvoidCellReferencesIntoShippedBlocks.java | 11 +- .../hadoop/hbase/client/TestFromClientSide.java | 4 +- .../TestRegionObserverScannerOpenHook.java | 31 +- .../TestPartitionedMobCompactor.java | 6 +- .../regionserver/NoOpScanPolicyObserver.java | 24 +- .../regionserver/TestCompactingMemStore.java | 34 +- .../hbase/regionserver/TestDefaultMemStore.java | 66 +- .../regionserver/TestMobStoreCompaction.java | 5 +- .../regionserver/TestReversibleScanners.java | 22 +- .../hbase/regionserver/TestStoreScanner.java | 682 +++++++++---------- .../hbase/util/TestCoprocessorScanPolicy.java | 24 +- 21 files changed, 552 insertions(+), 1045 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index 35f85f7..b489fe4 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.coprocessor.example; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.NavigableSet; +import java.util.OptionalInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -194,9 +194,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver { // take default action return null; } - Scan scan = new Scan(); - scan.setMaxVersions(scanInfo.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, scanners, + return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } @@ -210,9 +208,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver { // take default action return null; } - Scan scan = new Scan(); - scan.setMaxVersions(scanInfo.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, scanners, scanType, + return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners, scanType, store.getSmallestReadPoint(), earliestPutTs); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index c475b17..89d2958 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -22,6 +22,7 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.OptionalInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,7 +33,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.CellSink; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; @@ -74,9 +74,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { @Override public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { - Scan scan = new Scan(); - scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions()); - return new StoreScanner(store, store.getScanInfo(), scan, scanners, scanType, + return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType, smallestReadPoint, fd.earliestPutTs); } }; http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index c378a88..2308ddf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.crypto.Encryption; @@ -805,15 +804,12 @@ public class PartitionedMobCompactor extends MobCompactor { * @throws IOException if IO failure is encountered */ private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType scanType) - throws IOException { + throws IOException { List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, true, false, false, HConstants.LATEST_TIMESTAMP); - Scan scan = new Scan(); - scan.setMaxVersions(column.getMaxVersions()); long ttl = HStore.determineTTLFromFamily(column); ScanInfo scanInfo = new ScanInfo(conf, column, ttl, 0, CellComparator.COMPARATOR); - return new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L, - HConstants.LATEST_TIMESTAMP); + return new StoreScanner(scanInfo, scanType, scanners); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index fb837e8..f38ffb5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java index 8f481e0..5386c7d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java @@ -19,16 +19,16 @@ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; - import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.OptionalInt; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; /** * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator @@ -106,23 +106,15 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator /** * Creates the scanner for compacting the pipeline. - * * @return the scanner */ private StoreScanner createScanner(Store store, List<KeyValueScanner> scanners) throws IOException { - - Scan scan = new Scan(); - scan.setMaxVersions(); //Get all available versions - StoreScanner internalScanner = - new StoreScanner(store, store.getScanInfo(), scan, scanners, - ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); - - return internalScanner; + // Get all available versions + return new StoreScanner(store, store.getScanInfo(), OptionalInt.of(Integer.MAX_VALUE), scanners, + ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } - /* Refill kev-value set (should be invoked only when KVS is empty) * Returns true if KVS is non-empty */ private boolean refillKVS() { http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 07f98ad..86b9ea9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -53,11 +53,9 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { } /** Constructor for testing. */ - ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, - final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners) - throws IOException { - super(scan, scanInfo, scanType, columns, scanners, - HConstants.LATEST_TIMESTAMP); + ReversedStoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, + List<? extends KeyValueScanner> scanners) throws IOException { + super(scan, scanInfo, columns, scanners); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 298f3d4..d29d48f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.OptionalInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; @@ -86,11 +85,8 @@ abstract class StoreFlusher { smallestReadPoint); } if (scanner == null) { - Scan scan = new Scan(); - scan.setMaxVersions(store.getScanInfo().getMaxVersions()); - scanner = new StoreScanner(store, store.getScanInfo(), scan, - snapshotScanners, ScanType.COMPACT_RETAIN_DELETES, - smallestReadPoint, HConstants.OLDEST_TIMESTAMP); + scanner = new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners, + ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); } assert scanner != null; if (store.getCoprocessorHost() != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index a220f54..9a096f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -24,6 +24,8 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; +import java.util.Optional; +import java.util.OptionalInt; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; @@ -45,7 +47,6 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler; import org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher; -import org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher; import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; import org.apache.hadoop.hbase.util.CollectionUtils; @@ -66,7 +67,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver { private static final Log LOG = LogFactory.getLog(StoreScanner.class); // In unit tests, the store could be null - protected final Store store; + protected final Optional<Store> store; private ScanQueryMatcher matcher; protected KeyValueHeap heap; private boolean cacheBlocks; @@ -166,14 +167,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } /** An internal constructor. */ - protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo, - final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks, ScanType scanType) { + private StoreScanner(Optional<Store> store, Scan scan, ScanInfo scanInfo, + int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) { this.readPt = readPt; this.store = store; this.cacheBlocks = cacheBlocks; get = scan.isGetScan(); - int numCol = columns == null ? 0 : columns.size(); - explicitColumnQuery = numCol > 0; + explicitColumnQuery = numColumns > 0; this.scan = scan; this.now = EnvironmentEdgeManager.currentTime(); this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); @@ -183,13 +183,12 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // the seek operation. However, we also look the row-column Bloom filter // for multi-row (non-"get") scans because this is not done in // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>). - this.useRowColBloom = numCol > 1 || (!get && numCol == 1); - + this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1); this.maxRowSize = scanInfo.getTableMaxRowSize(); if (get) { this.readType = Scan.ReadType.PREAD; this.scanUsePread = true; - } else if(scanType != scanType.USER_SCAN) { + } else if(scanType != ScanType.USER_SCAN) { // For compaction scanners never use Pread as already we have stream based scanners on the // store files to be compacted this.readType = Scan.ReadType.STREAM; @@ -207,13 +206,15 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.preadMaxBytes = scanInfo.getPreadMaxBytes(); this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); // Parallel seeking is on if the config allows and more there is more than one store file. - if (this.store != null && this.store.getStorefilesCount() > 1) { - RegionServerServices rsService = ((HStore) store).getHRegion().getRegionServerServices(); - if (rsService != null && scanInfo.isParallelSeekEnabled()) { - this.parallelSeekEnabled = true; - this.executor = rsService.getExecutorService(); + this.store.ifPresent(s -> { + if (s.getStorefilesCount() > 1) { + RegionServerServices rsService = ((HStore) s).getHRegion().getRegionServerServices(); + if (rsService != null && scanInfo.isParallelSeekEnabled()) { + this.parallelSeekEnabled = true; + this.executor = rsService.getExecutorService(); + } } - } + }); } private void addCurrentScanners(List<? extends KeyValueScanner> scanners) { @@ -229,21 +230,23 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @param columns which columns we are scanning * @throws IOException */ - public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns, - long readPt) - throws IOException { - this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks(), ScanType.USER_SCAN); + public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns, + long readPt) throws IOException { + this(Optional.of(store), scan, scanInfo, columns != null ? columns.size() : 0, readPt, + scan.getCacheBlocks(), ScanType.USER_SCAN); if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); } matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, store.getCoprocessorHost()); - this.store.addChangedReaderObserver(this); + store.addChangedReaderObserver(this); try { // Pass columns to try to filter out unnecessary StoreFiles. - List<KeyValueScanner> scanners = getScannersNoCompaction(); + List<KeyValueScanner> scanners = selectScannersFrom(store, + store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), + scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt)); // Seek all scanners to the start of the Row (or if the exact matching row // key does not exist, then to the start of the next matching Row). @@ -263,66 +266,61 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } catch (IOException e) { // remove us from the HStore#changedReaderObservers here or we'll have no chance to // and might cause memory leak - this.store.deleteChangedReaderObserver(this); + store.deleteChangedReaderObserver(this); throw e; } } + // a dummy scan instance for compaction. + private static final Scan SCAN_FOR_COMPACTION = new Scan(); + /** - * Used for compactions.<p> - * + * Used for compactions. + * <p> * Opens a scanner across specified StoreFiles. * @param store who we scan - * @param scan the spec * @param scanners ancillary scanners - * @param smallestReadPoint the readPoint that we should use for tracking - * versions + * @param smallestReadPoint the readPoint that we should use for tracking versions */ - public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, - List<? extends KeyValueScanner> scanners, ScanType scanType, - long smallestReadPoint, long earliestPutTs) throws IOException { - this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); + public StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions, + List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs) throws IOException { + this(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs, null, + null); } /** - * Used for compactions that drop deletes from a limited range of rows.<p> - * + * Used for compactions that drop deletes from a limited range of rows. + * <p> * Opens a scanner across specified StoreFiles. * @param store who we scan - * @param scan the spec * @param scanners ancillary scanners * @param smallestReadPoint the readPoint that we should use for tracking versions * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. */ - public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, + public StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions, List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { - this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, + this(store, scanInfo, maxVersions, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); } - private StoreScanner(Store store, ScanInfo scanInfo, Scan scan, + private StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions, List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { - this(store, scan, scanInfo, null, - ((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); - if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0) - || (scan.getStopRow() != null && scan.getStopRow().length > 0) - || !scan.getTimeRange().isAllTime()) { - // use legacy query matcher since we do not consider the scan object in our code. Only used to - // keep compatibility for coprocessor. - matcher = LegacyScanQueryMatcher.create(scan, scanInfo, null, scanType, smallestReadPoint, - earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, - store.getCoprocessorHost()); - } else { - matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, - earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, - store.getCoprocessorHost()); - } + this(Optional.of(store), + maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt()) + : SCAN_FOR_COMPACTION, + scanInfo, 0, ((HStore) store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), + false, scanType); + assert scanType != ScanType.USER_SCAN; + matcher = + CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, + oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost()); // Filter the list of scanners using Bloom filters, time range, TTL, etc. - scanners = selectScannersFrom(scanners); + scanners = selectScannersFrom(store, scanners); // Seek all scanners to the initial key seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); @@ -331,62 +329,46 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner resetKVHeap(scanners, store.getComparator()); } - @VisibleForTesting - StoreScanner(final Scan scan, ScanInfo scanInfo, - ScanType scanType, final NavigableSet<byte[]> columns, - final List<? extends KeyValueScanner> scanners) throws IOException { - this(scan, scanInfo, scanType, columns, scanners, - HConstants.LATEST_TIMESTAMP, - // 0 is passed as readpoint because the test bypasses Store - 0); - } - - @VisibleForTesting - StoreScanner(final Scan scan, ScanInfo scanInfo, - ScanType scanType, final NavigableSet<byte[]> columns, - final List<? extends KeyValueScanner> scanners, long earliestPutTs) - throws IOException { - this(scan, scanInfo, scanType, columns, scanners, earliestPutTs, - // 0 is passed as readpoint because the test bypasses Store - 0); - } - - public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType, - final NavigableSet<byte[]> columns, final List<? extends KeyValueScanner> scanners, long earliestPutTs, - long readPt) throws IOException { - this(null, scan, scanInfo, columns, readPt, - scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false, scanType); - if (scanType == ScanType.USER_SCAN) { - this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, - null); - } else { - if (scan.hasFilter() || (scan.getStartRow() != null && scan.getStartRow().length > 0) - || (scan.getStopRow() != null && scan.getStopRow().length > 0) - || !scan.getTimeRange().isAllTime() || columns != null) { - // use legacy query matcher since we do not consider the scan object in our code. Only used - // to keep compatibility for coprocessor. - matcher = LegacyScanQueryMatcher.create(scan, scanInfo, columns, scanType, Long.MAX_VALUE, - earliestPutTs, oldestUnexpiredTS, now, null, null, store.getCoprocessorHost()); - } else { - this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, - earliestPutTs, oldestUnexpiredTS, now, null, null, null); - } - } - + private void seekAllScanner(ScanInfo scanInfo, List<? extends KeyValueScanner> scanners) + throws IOException { // Seek all scanners to the initial key seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled); addCurrentScanners(scanners); resetKVHeap(scanners, scanInfo.getComparator()); } - /** - * Get a filtered list of scanners. Assumes we are not in a compaction. - * @return list of scanners to seek - */ - private List<KeyValueScanner> getScannersNoCompaction() throws IOException { - return selectScannersFrom( - store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), - scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt)); + // For mob compaction only as we do not have a Store instance when doing mob compaction. + public StoreScanner(ScanInfo scanInfo, ScanType scanType, + List<? extends KeyValueScanner> scanners) throws IOException { + this(Optional.empty(), SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); + assert scanType != ScanType.USER_SCAN; + this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, + oldestUnexpiredTS, now, null, null, null); + seekAllScanner(scanInfo, scanners); + } + + // Used to instantiate a scanner for user scan in test + @VisibleForTesting + StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns, + List<? extends KeyValueScanner> scanners) throws IOException { + // 0 is passed as readpoint because the test bypasses Store + this(Optional.empty(), scan, scanInfo, columns != null ? columns.size() : 0, 0L, + scan.getCacheBlocks(), ScanType.USER_SCAN); + this.matcher = + UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); + seekAllScanner(scanInfo, scanners); + } + + // Used to instantiate a scanner for compaction in test + @VisibleForTesting + StoreScanner(ScanInfo scanInfo, OptionalInt maxVersions, ScanType scanType, + List<? extends KeyValueScanner> scanners) throws IOException { + // 0 is passed as readpoint because the test bypasses Store + this(Optional.empty(), maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt()) + : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType); + this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, + HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); + seekAllScanner(scanInfo, scanners); } @VisibleForTesting @@ -439,18 +421,17 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } /** - * Filters the given list of scanners using Bloom filter, time range, and - * TTL. + * Filters the given list of scanners using Bloom filter, time range, and TTL. * <p> * Will be overridden by testcase so declared as protected. */ @VisibleForTesting - protected List<KeyValueScanner> selectScannersFrom( - final List<? extends KeyValueScanner> allScanners) { + protected List<KeyValueScanner> selectScannersFrom(Store store, + List<? extends KeyValueScanner> allScanners) { boolean memOnly; boolean filesOnly; if (scan instanceof InternalScan) { - InternalScan iscan = (InternalScan)scan; + InternalScan iscan = (InternalScan) scan; memOnly = iscan.isCheckOnlyMemStore(); filesOnly = iscan.isCheckOnlyStoreFiles(); } else { @@ -462,7 +443,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // We can only exclude store files based on TTL if minVersions is set to 0. // Otherwise, we might have to return KVs that have technically expired. - long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS: Long.MIN_VALUE; + long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : Long.MIN_VALUE; // include only those scan files which pass all filters for (KeyValueScanner kvs : allScanners) { @@ -503,10 +484,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (withDelayedScannersClose) { this.closing = true; } - // Under test, we dont have a this.store - if (this.store != null) { - this.store.deleteChangedReaderObserver(this); - } + // For mob compaction, we do not have a store. + this.store.ifPresent(s -> s.deleteChangedReaderObserver(this)); if (withDelayedScannersClose) { clearAndClose(scannersForDelayedClose); clearAndClose(memStoreScannersAfterFlush); @@ -583,7 +562,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } // Only do a sanity-check if store and comparator are available. - CellComparator comparator = store != null ? store.getComparator() : null; + CellComparator comparator = store.map(s -> s.getComparator()).orElse(null); int count = 0; long totalBytesRead = 0; @@ -895,6 +874,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @return if top of heap has changed (and KeyValueHeap has to try the next KV) */ protected final boolean reopenAfterFlush() throws IOException { + // here we can make sure that we have a Store instance. + Store store = this.store.get(); Cell lastTop = heap.peek(); // When we have the scan object, should we not pass it to getScanners() to get a limited set of // scanners? We did so in the constructor and we could have done it now by storing the scan @@ -906,7 +887,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get, scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, false)); allScanners.addAll(memStoreScannersAfterFlush); - scanners = selectScannersFrom(allScanners); + scanners = selectScannersFrom(store, allScanners); // Clear the current set of flushed store files so that they don't get added again flushedStoreFiles.clear(); memStoreScannersAfterFlush.clear(); @@ -998,8 +979,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @VisibleForTesting void trySwitchToStreamRead() { - if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null || - bytesRead < preadMaxBytes) { + if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || + heap.peek() == null || bytesRead < preadMaxBytes) { return; } if (LOG.isDebugEnabled()) { @@ -1021,6 +1002,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List<KeyValueScanner> fileScanners = null; List<KeyValueScanner> newCurrentScanners; KeyValueHeap newHeap; + // We must have a store instance here + Store store = this.store.get(); try { // recreate the scanners on the current file scanners fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index d43a75b..15da298 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.OptionalInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,7 +34,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; @@ -499,10 +499,8 @@ public abstract class Compactor<T extends CellSink> { */ protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { - Scan scan = new Scan(); - scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions()); - return new StoreScanner(store, store.getScanInfo(), scan, scanners, - scanType, smallestReadPoint, earliestPutTs); + return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType, + smallestReadPoint, earliestPutTs); } /** @@ -515,11 +513,9 @@ public abstract class Compactor<T extends CellSink> { * @return A compaction scanner. */ protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners, - long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, - byte[] dropDeletesToRow) throws IOException { - Scan scan = new Scan(); - scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions()); - return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint, - earliestPutTs, dropDeletesFromRow, dropDeletesToRow); + long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, + byte[] dropDeletesToRow) throws IOException { + return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, + smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java deleted file mode 100644 index 07fcb08..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java +++ /dev/null @@ -1,384 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.querymatcher; - -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; - -import java.io.IOException; -import java.util.Arrays; -import java.util.NavigableSet; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeepDeletedCells; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.Filter.ReturnCode; -import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; -import org.apache.hadoop.hbase.regionserver.ScanInfo; -import org.apache.hadoop.hbase.regionserver.ScanType; -import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; - -/** - * The old query matcher implementation. Used to keep compatibility for coprocessor that could - * overwrite the StoreScanner before compaction. Should be removed once we find a better way to do - * filtering during compaction. - */ -@Deprecated -@InterfaceAudience.Private -public class LegacyScanQueryMatcher extends ScanQueryMatcher { - - private final TimeRange tr; - - private final Filter filter; - - /** Keeps track of deletes */ - private final DeleteTracker deletes; - - /** - * The following three booleans define how we deal with deletes. There are three different - * aspects: - * <ol> - * <li>Whether to keep delete markers. This is used in compactions. Minor compactions always keep - * delete markers.</li> - * <li>Whether to keep deleted rows. This is also used in compactions, if the store is set to keep - * deleted rows. This implies keeping the delete markers as well.</li> In this case deleted rows - * are subject to the normal max version and TTL/min version rules just like "normal" rows. - * <li>Whether a scan can do time travel queries even before deleted marker to reach deleted - * rows.</li> - * </ol> - */ - /** whether to retain delete markers */ - private boolean retainDeletesInOutput; - - /** whether to return deleted rows */ - private final KeepDeletedCells keepDeletedCells; - - // By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete - // marker is always removed during a major compaction. If set to non-zero - // value then major compaction will try to keep a delete marker around for - // the given number of milliseconds. We want to keep the delete markers - // around a bit longer because old puts might appear out-of-order. For - // example, during log replication between two clusters. - // - // If the delete marker has lived longer than its column-family's TTL then - // the delete marker will be removed even if time.to.purge.deletes has not - // passed. This is because all the Puts that this delete marker can influence - // would have also expired. (Removing of delete markers on col family TTL will - // not happen if min-versions is set to non-zero) - // - // But, if time.to.purge.deletes has not expired then a delete - // marker will not be removed just because there are no Puts that it is - // currently influencing. This is because Puts, that this delete can - // influence. may appear out of order. - private final long timeToPurgeDeletes; - - /** - * This variable shows whether there is an null column in the query. There always exists a null - * column in the wildcard column query. There maybe exists a null column in the explicit column - * query based on the first column. - */ - private final boolean hasNullColumn; - - /** readPoint over which the KVs are unconditionally included */ - private final long maxReadPointToTrackVersions; - - /** - * Oldest put in any of the involved store files Used to decide whether it is ok to delete family - * delete marker of this store keeps deleted KVs. - */ - protected final long earliestPutTs; - - private final byte[] stopRow; - - private byte[] dropDeletesFromRow = null, dropDeletesToRow = null; - - private LegacyScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns, - boolean hasNullColumn, DeleteTracker deletes, ScanType scanType, long readPointToUse, - long earliestPutTs, long oldestUnexpiredTS, long now) { - super(createStartKeyFromRow(scan.getStartRow(), scanInfo), scanInfo, columns, oldestUnexpiredTS, - now); - TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily()); - if (timeRange == null) { - this.tr = scan.getTimeRange(); - } else { - this.tr = timeRange; - } - this.hasNullColumn = hasNullColumn; - this.deletes = deletes; - this.filter = scan.getFilter(); - this.maxReadPointToTrackVersions = readPointToUse; - this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes(); - this.earliestPutTs = earliestPutTs; - - /* how to deal with deletes */ - this.keepDeletedCells = scanInfo.getKeepDeletedCells(); - this.retainDeletesInOutput = scanType == ScanType.COMPACT_RETAIN_DELETES; - this.stopRow = scan.getStopRow(); - } - - private LegacyScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker columns, - boolean hasNullColumn, DeleteTracker deletes, ScanType scanType, long readPointToUse, - long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow, - byte[] dropDeletesToRow) { - this(scan, scanInfo, columns, hasNullColumn, deletes, scanType, readPointToUse, earliestPutTs, - oldestUnexpiredTS, now); - this.dropDeletesFromRow = Preconditions.checkNotNull(dropDeletesFromRow); - this.dropDeletesToRow = Preconditions.checkNotNull(dropDeletesToRow); - } - - @Override - public void beforeShipped() throws IOException { - super.beforeShipped(); - deletes.beforeShipped(); - } - - @Override - public MatchCode match(Cell cell) throws IOException { - if (filter != null && filter.filterAllRemaining()) { - return MatchCode.DONE_SCAN; - } - MatchCode returnCode = preCheck(cell); - if (returnCode != null) { - return returnCode; - } - /* - * The delete logic is pretty complicated now. - * This is corroborated by the following: - * 1. The store might be instructed to keep deleted rows around. - * 2. A scan can optionally see past a delete marker now. - * 3. If deleted rows are kept, we have to find out when we can - * remove the delete markers. - * 4. Family delete markers are always first (regardless of their TS) - * 5. Delete markers should not be counted as version - * 6. Delete markers affect puts of the *same* TS - * 7. Delete marker need to be version counted together with puts - * they affect - */ - long timestamp = cell.getTimestamp(); - byte typeByte = cell.getTypeByte(); - long mvccVersion = cell.getSequenceId(); - if (CellUtil.isDelete(typeByte)) { - if (keepDeletedCells == KeepDeletedCells.FALSE - || (keepDeletedCells == KeepDeletedCells.TTL && timestamp < oldestUnexpiredTS)) { - // first ignore delete markers if the scanner can do so, and the - // range does not include the marker - // - // during flushes and compactions also ignore delete markers newer - // than the readpoint of any open scanner, this prevents deleted - // rows that could still be seen by a scanner from being collected - boolean includeDeleteMarker = tr.withinOrAfterTimeRange(timestamp); - if (includeDeleteMarker && mvccVersion <= maxReadPointToTrackVersions) { - this.deletes.add(cell); - } - // Can't early out now, because DelFam come before any other keys - } - - if (timeToPurgeDeletes > 0 - && (EnvironmentEdgeManager.currentTime() - timestamp) <= timeToPurgeDeletes) { - return MatchCode.INCLUDE; - } else if (retainDeletesInOutput || mvccVersion > maxReadPointToTrackVersions) { - // always include or it is not time yet to check whether it is OK - // to purge deltes or not - // if this is not a user scan (compaction), we can filter this deletemarker right here - // otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking - return MatchCode.INCLUDE; - } else if (keepDeletedCells == KeepDeletedCells.TRUE - || (keepDeletedCells == KeepDeletedCells.TTL && timestamp >= oldestUnexpiredTS)) { - if (timestamp < earliestPutTs) { - // keeping delete rows, but there are no puts older than - // this delete in the store files. - return columns.getNextRowOrNextColumn(cell); - } - // else: fall through and do version counting on the - // delete markers - } else { - return MatchCode.SKIP; - } - // note the following next else if... - // delete marker are not subject to other delete markers - } else if (!this.deletes.isEmpty()) { - DeleteResult deleteResult = deletes.isDeleted(cell); - switch (deleteResult) { - case FAMILY_DELETED: - case COLUMN_DELETED: - return columns.getNextRowOrNextColumn(cell); - case VERSION_DELETED: - case FAMILY_VERSION_DELETED: - return MatchCode.SKIP; - case NOT_DELETED: - break; - default: - throw new RuntimeException("UNEXPECTED"); - } - } - - int timestampComparison = tr.compare(timestamp); - if (timestampComparison >= 1) { - return MatchCode.SKIP; - } else if (timestampComparison <= -1) { - return columns.getNextRowOrNextColumn(cell); - } - - // STEP 1: Check if the column is part of the requested columns - MatchCode colChecker = columns.checkColumn(cell, typeByte); - if (colChecker == MatchCode.INCLUDE) { - ReturnCode filterResponse = ReturnCode.SKIP; - // STEP 2: Yes, the column is part of the requested columns. Check if filter is present - if (filter != null) { - // STEP 3: Filter the key value and return if it filters out - filterResponse = filter.filterKeyValue(cell); - switch (filterResponse) { - case SKIP: - return MatchCode.SKIP; - case NEXT_COL: - return columns.getNextRowOrNextColumn(cell); - case NEXT_ROW: - return MatchCode.SEEK_NEXT_ROW; - case SEEK_NEXT_USING_HINT: - return MatchCode.SEEK_NEXT_USING_HINT; - default: - //It means it is either include or include and seek next - break; - } - } - /* - * STEP 4: Reaching this step means the column is part of the requested columns and either - * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response. - * Now check the number of versions needed. This method call returns SKIP, INCLUDE, - * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL. - * - * FilterResponse ColumnChecker Desired behavior - * INCLUDE SKIP row has already been included, SKIP. - * INCLUDE INCLUDE INCLUDE - * INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW - * INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP. - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL - * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW - * - * In all the above scenarios, we return the column checker return value except for - * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE) - */ - colChecker = columns.checkVersions(cell, timestamp, typeByte, - mvccVersion > maxReadPointToTrackVersions); - if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) { - if (colChecker != MatchCode.SKIP) { - return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW; - } - return MatchCode.SEEK_NEXT_ROW; - } - return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL && - colChecker == MatchCode.INCLUDE) ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL - : colChecker; - } - return colChecker; - } - - @Override - public boolean hasNullColumnInQuery() { - return hasNullColumn; - } - - /** - * Handle partial-drop-deletes. As we match keys in order, when we have a range from which we can - * drop deletes, we can set retainDeletesInOutput to false for the duration of this range only, - * and maintain consistency. - */ - private void checkPartialDropDeleteRange(Cell curCell) { - // If partial-drop-deletes are used, initially, dropDeletesFromRow and dropDeletesToRow - // are both set, and the matcher is set to retain deletes. We assume ordered keys. When - // dropDeletesFromRow is leq current kv, we start dropping deletes and reset - // dropDeletesFromRow; thus the 2nd "if" starts to apply. - if ((dropDeletesFromRow != null) - && (Arrays.equals(dropDeletesFromRow, HConstants.EMPTY_START_ROW) - || (CellComparator.COMPARATOR.compareRows(curCell, dropDeletesFromRow, 0, - dropDeletesFromRow.length) >= 0))) { - retainDeletesInOutput = false; - dropDeletesFromRow = null; - } - // If dropDeletesFromRow is null and dropDeletesToRow is set, we are inside the partial- - // drop-deletes range. When dropDeletesToRow is leq current kv, we stop dropping deletes, - // and reset dropDeletesToRow so that we don't do any more compares. - if ((dropDeletesFromRow == null) && (dropDeletesToRow != null) - && !Arrays.equals(dropDeletesToRow, HConstants.EMPTY_END_ROW) && (CellComparator.COMPARATOR - .compareRows(curCell, dropDeletesToRow, 0, dropDeletesToRow.length) >= 0)) { - retainDeletesInOutput = true; - dropDeletesToRow = null; - } - } - - @Override - protected void reset() { - checkPartialDropDeleteRange(currentRow); - } - - @Override - public boolean isUserScan() { - return false; - } - - @Override - public boolean moreRowsMayExistAfter(Cell cell) { - if (this.stopRow == null || this.stopRow.length == 0) { - return true; - } - return rowComparator.compareRows(cell, stopRow, 0, stopRow.length) < 0; - } - - @Override - public Filter getFilter() { - return filter; - } - - @Override - public Cell getNextKeyHint(Cell cell) throws IOException { - if (filter == null) { - return null; - } else { - return filter.getNextCellHint(cell); - } - } - - public static LegacyScanQueryMatcher create(Scan scan, ScanInfo scanInfo, - NavigableSet<byte[]> columns, ScanType scanType, long readPointToUse, long earliestPutTs, - long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow, byte[] dropDeletesToRow, - RegionCoprocessorHost regionCoprocessorHost) throws IOException { - boolean hasNullColumn = - !(columns != null && columns.size() != 0 && columns.first().length != 0); - Pair<DeleteTracker, ColumnTracker> trackers = getTrackers(regionCoprocessorHost, null, - scanInfo, oldestUnexpiredTS, scan); - DeleteTracker deleteTracker = trackers.getFirst(); - ColumnTracker columnTracker = trackers.getSecond(); - if (dropDeletesFromRow == null) { - return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, hasNullColumn, deleteTracker, - scanType, readPointToUse, earliestPutTs, oldestUnexpiredTS, now); - } else { - return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, hasNullColumn, deleteTracker, - scanType, readPointToUse, earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, - dropDeletesToRow); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java index 62c8e7b..8dfc8aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.OptionalInt; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -269,19 +270,17 @@ public class TestAvoidCellReferencesIntoShippedBlocks { private InternalScanner createCompactorScanner(Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs) throws IOException { - Scan scan = new Scan(); - scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions()); - return new CompactorStoreScanner(store, store.getScanInfo(), scan, scanners, scanType, - store.getSmallestReadPoint(), earliestPutTs); + return new CompactorStoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, + scanType, store.getSmallestReadPoint(), earliestPutTs); } } private static class CompactorStoreScanner extends StoreScanner { - public CompactorStoreScanner(Store store, ScanInfo scanInfo, Scan scan, + public CompactorStoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions, List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { - super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs); + super(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 48cb812..bae8d68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -576,9 +576,9 @@ public class TestFromClientSide { } @Override - protected List<KeyValueScanner> selectScannersFrom( + protected List<KeyValueScanner> selectScannersFrom(Store store, List<? extends KeyValueScanner> allScanners) { - List<KeyValueScanner> scanners = super.selectScannersFrom(allScanners); + List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners); List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size()); for (KeyValueScanner scanner : scanners) { newScanners.add(new DelegatingKeyValueScanner(scanner) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 26cfed7..e9bf09b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; @@ -116,18 +117,31 @@ public class TestRegionObserverScannerOpenHook { } } + private static final InternalScanner NO_DATA = new InternalScanner() { + + @Override + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return false; + } + + @Override + public boolean next(List<Cell> results) throws IOException { + return false; + } + + @Override + public void close() throws IOException {} + }; /** * Don't allow any data in a flush by creating a custom {@link StoreScanner}. */ public static class NoDataFromFlush implements RegionObserver { + @Override public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException { - Scan scan = new Scan(); - scan.setFilter(new NoDataFilter()); - return new StoreScanner(store, store.getScanInfo(), scan, - scanners, ScanType.COMPACT_RETAIN_DELETES, - store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); + scanners.forEach(KeyValueScanner::close); + return NO_DATA; } } @@ -140,11 +154,8 @@ public class TestRegionObserverScannerOpenHook { public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) throws IOException { - Scan scan = new Scan(); - scan.setFilter(new NoDataFilter()); - return new StoreScanner(store, store.getScanInfo(), scan, scanners, - ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + scanners.forEach(KeyValueScanner::close); + return NO_DATA; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java index 2fe8085..b8e1204 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java @@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; @@ -883,13 +882,10 @@ public class TestPartitionedMobCompactor { } List<KeyValueScanner> scanners = new ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, false, HConstants.LATEST_TIMESTAMP)); - Scan scan = new Scan(); - scan.setMaxVersions(hcd.getMaxVersions()); long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); long ttl = HStore.determineTTLFromFamily(hcd); ScanInfo scanInfo = new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, CellComparator.COMPARATOR); - StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_RETAIN_DELETES, null, - scanners, 0L, HConstants.LATEST_TIMESTAMP); + StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_RETAIN_DELETES, scanners); List<Cell> results = new ArrayList<>(); boolean hasMore = true; http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java index b090cdd..5423578 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java @@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.NavigableSet; +import java.util.OptionalInt; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; * of functionality still behaves as expected. */ public class NoOpScanPolicyObserver implements RegionObserver { + /** * Reimplement the default behavior */ @@ -45,11 +46,9 @@ public class NoOpScanPolicyObserver implements RegionObserver { public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, Store store, List<KeyValueScanner> scanners, InternalScanner s) throws IOException { ScanInfo oldSI = store.getScanInfo(); - ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), oldSI.getTtl(), - oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); - Scan scan = new Scan(); - scan.setMaxVersions(oldSI.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, scanners, + ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), + oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } @@ -57,16 +56,15 @@ public class NoOpScanPolicyObserver implements RegionObserver { * Reimplement the default behavior */ @Override - public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, - Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, + public InternalScanner preCompactScannerOpen( + final ObserverContext<RegionCoprocessorEnvironment> c, Store store, + List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s) throws IOException { // this demonstrates how to override the scanners default behavior ScanInfo oldSI = store.getScanInfo(); - ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), oldSI.getTtl(), - oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); - Scan scan = new Scan(); - scan.setMaxVersions(oldSI.getMaxVersions()); - return new StoreScanner(store, scanInfo, scan, scanners, scanType, + ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), + oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners, scanType, store.getSmallestReadPoint(), earliestPutTs); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 8118e41..dc3cf4d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -194,25 +194,25 @@ public class TestCompactingMemStore extends TestDefaultMemStore { for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); - ScanType scanType = ScanType.USER_SCAN; - InternalScanner scanner = new StoreScanner(new Scan( - Bytes.toBytes(startRowId)), scanInfo, scanType, null, - memstore.getScanners(0)); - List<Cell> results = new ArrayList<>(); - for (int i = 0; scanner.next(results); i++) { - int rowId = startRowId + i; - Cell left = results.get(0); - byte[] row1 = Bytes.toBytes(rowId); - assertTrue("Row name", + try (InternalScanner scanner = + new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null, + memstore.getScanners(0))) { + List<Cell> results = new ArrayList<>(); + for (int i = 0; scanner.next(results); i++) { + int rowId = startRowId + i; + Cell left = results.get(0); + byte[] row1 = Bytes.toBytes(rowId); + assertTrue("Row name", CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) == 0); - assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); - List<Cell> row = new ArrayList<>(); - for (Cell kv : results) { - row.add(kv); + assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); + List<Cell> row = new ArrayList<>(); + for (Cell kv : results) { + row.add(kv); + } + isExpectedRowWithoutTimestamps(rowId, row); + // Clear out set. Otherwise row results accumulate. + results.clear(); } - isExpectedRowWithoutTimestamps(rowId, row); - // Clear out set. Otherwise row results accumulate. - results.clear(); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 7b10846..b36b8fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -18,9 +18,17 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,22 +63,13 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.Rule; import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertNotNull; - import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.junit.rules.TestRule; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; /** memstore test case */ @Category({RegionServerTests.class, MediumTests.class}) @@ -164,10 +163,8 @@ public class TestDefaultMemStore { Configuration conf = HBaseConfiguration.create(); ScanInfo scanInfo = new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); - ScanType scanType = ScanType.USER_SCAN; - StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); int count = 0; - try { + try (StoreScanner s = new StoreScanner(scan, scanInfo, null, memstorescanners)) { while (s.next(result)) { LOG.info(result); count++; @@ -175,8 +172,6 @@ public class TestDefaultMemStore { assertEquals(rowCount, result.size()); result.clear(); } - } finally { - s.close(); } assertEquals(rowCount, count); for (KeyValueScanner scanner : memstorescanners) { @@ -185,9 +180,8 @@ public class TestDefaultMemStore { memstorescanners = this.memstore.getScanners(mvcc.getReadPoint()); // Now assert can count same number even if a snapshot mid-scan. - s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); count = 0; - try { + try (StoreScanner s = new StoreScanner(scan, scanInfo, null, memstorescanners)) { while (s.next(result)) { LOG.info(result); // Assert the stuff is coming out in right order. @@ -201,8 +195,6 @@ public class TestDefaultMemStore { } result.clear(); } - } finally { - s.close(); } assertEquals(rowCount, count); for (KeyValueScanner scanner : memstorescanners) { @@ -211,10 +203,9 @@ public class TestDefaultMemStore { memstorescanners = this.memstore.getScanners(mvcc.getReadPoint()); // Assert that new values are seen in kvset as we scan. long ts = System.currentTimeMillis(); - s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); count = 0; int snapshotIndex = 5; - try { + try (StoreScanner s = new StoreScanner(scan, scanInfo, null, memstorescanners)) { while (s.next(result)) { LOG.info(result); // Assert the stuff is coming out in right order. @@ -225,14 +216,12 @@ public class TestDefaultMemStore { if (count == snapshotIndex) { MemStoreSnapshot snapshot = this.memstore.snapshot(); this.memstore.clearSnapshot(snapshot.getId()); - // Added more rows into kvset. But the scanner wont see these rows. + // Added more rows into kvset. But the scanner wont see these rows. addRows(this.memstore, ts); LOG.info("Snapshotted, cleared it and then added values (which wont be seen)"); } result.clear(); } - } finally { - s.close(); } assertEquals(rowCount, count); } @@ -600,27 +589,26 @@ public class TestDefaultMemStore { //starting from each row, validate results should contain the starting row Configuration conf = HBaseConfiguration.create(); for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { - ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, - KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); - ScanType scanType = ScanType.USER_SCAN; - try (InternalScanner scanner = new StoreScanner(new Scan( - Bytes.toBytes(startRowId)), scanInfo, scanType, null, - memstore.getScanners(0))) { + ScanInfo scanInfo = + new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false); + try (InternalScanner scanner = + new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null, + memstore.getScanners(0))) { List<Cell> results = new ArrayList<>(); for (int i = 0; scanner.next(results); i++) { int rowId = startRowId + i; Cell left = results.get(0); byte[] row1 = Bytes.toBytes(rowId); - assertTrue( - "Row name", - CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) == 0); + assertTrue("Row name", + CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) == 0); assertEquals("Count of columns", QUALIFIER_COUNT, results.size()); List<Cell> row = new ArrayList<>(); for (Cell kv : results) { row.add(kv); } isExpectedRowWithoutTimestamps(rowId, row); - // Clear out set. Otherwise row results accumulate. + // Clear out set. Otherwise row results accumulate. results.clear(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java index 21089ed..9ab1440 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java @@ -442,14 +442,11 @@ public class TestMobStoreCompaction { List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, false, HConstants.LATEST_TIMESTAMP); - Scan scan = new Scan(); - scan.setMaxVersions(hcd.getMaxVersions()); long timeToPurgeDeletes = Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0); long ttl = HStore.determineTTLFromFamily(hcd); ScanInfo scanInfo = new ScanInfo(copyOfConf, hcd, ttl, timeToPurgeDeletes, CellComparator.COMPARATOR); - StoreScanner scanner = new StoreScanner(scan, scanInfo, ScanType.COMPACT_DROP_DELETES, null, - scanners, 0L, HConstants.LATEST_TIMESTAMP); + StoreScanner scanner = new StoreScanner(scanInfo, ScanType.COMPACT_DROP_DELETES, scanners); try { size += UTIL.countRows(scanner); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index 3782fdb..8b34a2f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -68,6 +66,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; + /** * Test cases against ReversibleKeyValueScanner */ @@ -263,7 +263,6 @@ public class TestReversibleScanners { StoreFile sf2 = new HStoreFile(fs, writer2.getPath(), TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE, true); - ScanType scanType = ScanType.USER_SCAN; ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, Long.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR, false); @@ -271,16 +270,15 @@ public class TestReversibleScanners { // Case 1.Test a full reversed scan Scan scan = new Scan(); scan.setReversed(true); - StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, - scan, scanType, scanInfo, MAXMVCC); + StoreScanner storeScanner = + getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false); // Case 2.Test reversed scan with a specified start row int startRowNum = ROWSIZE / 2; byte[] startRow = ROWS[startRowNum]; scan.withStartRow(startRow); - storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, - scanType, scanInfo, MAXMVCC); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1), startRowNum + 1, false); @@ -289,16 +287,14 @@ public class TestReversibleScanners { assertTrue(QUALSIZE > 2); scan.addColumn(FAMILYNAME, QUALS[0]); scan.addColumn(FAMILYNAME, QUALS[2]); - storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, - scanType, scanInfo, MAXMVCC); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC); verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1, false); // Case 4.Test reversed scan with mvcc based on case 3 for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { LOG.info("Setting read point to " + readPoint); - storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, - scanType, scanInfo, readPoint); + storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, readPoint); int expectedRowCount = 0; int expectedKVCount = 0; for (int i = startRowNum; i >= 0; i--) { @@ -423,7 +419,7 @@ public class TestReversibleScanners { } private StoreScanner getReversibleStoreScanner(MemStore memstore, - StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType, + StoreFile sf1, StoreFile sf2, Scan scan, ScanInfo scanInfo, int readPoint) throws IOException { List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null, false, readPoint); @@ -434,7 +430,7 @@ public class TestReversibleScanners { columns = entry.getValue(); } StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo, - scanType, columns, scanners); + columns, scanners); return storeScanner; }