HBASE-16324 Remove LegacyScanQueryMatcher

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8d33949b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8d33949b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8d33949b

Branch: refs/heads/master
Commit: 8d33949b8db072902783f63cd9aaa68cbd6b905f
Parents: 2773510
Author: zhangduo <zhang...@apache.org>
Authored: Fri Aug 25 17:02:03 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Sat Aug 26 08:04:43 2017 +0800

----------------------------------------------------------------------
 .../example/ZooKeeperScanPolicyObserver.java    |  10 +-
 .../hbase/mob/DefaultMobStoreCompactor.java     |   6 +-
 .../compactions/PartitionedMobCompactor.java    |   8 +-
 .../hadoop/hbase/regionserver/HMobStore.java    |   1 -
 .../MemStoreCompactorSegmentsIterator.java      |  26 +-
 .../regionserver/ReversedStoreScanner.java      |   8 +-
 .../hadoop/hbase/regionserver/StoreFlusher.java |  10 +-
 .../hadoop/hbase/regionserver/StoreScanner.java | 217 +++---
 .../regionserver/compactions/Compactor.java     |  18 +-
 .../querymatcher/LegacyScanQueryMatcher.java    | 384 -----------
 ...estAvoidCellReferencesIntoShippedBlocks.java |  11 +-
 .../hadoop/hbase/client/TestFromClientSide.java |   4 +-
 .../TestRegionObserverScannerOpenHook.java      |  31 +-
 .../TestPartitionedMobCompactor.java            |   6 +-
 .../regionserver/NoOpScanPolicyObserver.java    |  24 +-
 .../regionserver/TestCompactingMemStore.java    |  34 +-
 .../hbase/regionserver/TestDefaultMemStore.java |  66 +-
 .../regionserver/TestMobStoreCompaction.java    |   5 +-
 .../regionserver/TestReversibleScanners.java    |  22 +-
 .../hbase/regionserver/TestStoreScanner.java    | 682 +++++++++----------
 .../hbase/util/TestCoprocessorScanPolicy.java   |  24 +-
 21 files changed, 552 insertions(+), 1045 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
index 35f85f7..b489fe4 100644
--- 
a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
+++ 
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
@@ -19,9 +19,9 @@
 package org.apache.hadoop.hbase.coprocessor.example;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.OptionalInt;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -194,9 +194,7 @@ public class ZooKeeperScanPolicyObserver implements 
RegionObserver {
       // take default action
       return null;
     }
-    Scan scan = new Scan();
-    scan.setMaxVersions(scanInfo.getMaxVersions());
-    return new StoreScanner(store, scanInfo, scan, scanners,
+    return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners,
         ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), 
HConstants.OLDEST_TIMESTAMP);
   }
 
@@ -210,9 +208,7 @@ public class ZooKeeperScanPolicyObserver implements 
RegionObserver {
       // take default action
       return null;
     }
-    Scan scan = new Scan();
-    scan.setMaxVersions(scanInfo.getMaxVersions());
-    return new StoreScanner(store, scanInfo, scan, scanners, scanType,
+    return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners, 
scanType,
         store.getSmallestReadPoint(), earliestPutTs);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index c475b17..89d2958 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -22,6 +22,7 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.OptionalInt;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,7 +33,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.CellSink;
 import org.apache.hadoop.hbase.regionserver.HMobStore;
 import org.apache.hadoop.hbase.regionserver.HStore;
@@ -74,9 +74,7 @@ public class DefaultMobStoreCompactor extends 
DefaultCompactor {
     @Override
     public InternalScanner createScanner(List<StoreFileScanner> scanners,
         ScanType scanType, FileDetails fd, long smallestReadPoint) throws 
IOException {
-      Scan scan = new Scan();
-      scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions());
-      return new StoreScanner(store, store.getScanInfo(), scan, scanners, 
scanType,
+      return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), 
scanners, scanType,
           smallestReadPoint, fd.earliestPutTs);
     }
   };

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index c378a88..2308ddf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.crypto.Encryption;
@@ -805,15 +804,12 @@ public class PartitionedMobCompactor extends MobCompactor 
{
    * @throws IOException if IO failure is encountered
    */
   private StoreScanner createScanner(List<StoreFile> filesToCompact, ScanType 
scanType)
-    throws IOException {
+      throws IOException {
     List<StoreFileScanner> scanners = 
StoreFileScanner.getScannersForStoreFiles(filesToCompact,
       false, true, false, false, HConstants.LATEST_TIMESTAMP);
-    Scan scan = new Scan();
-    scan.setMaxVersions(column.getMaxVersions());
     long ttl = HStore.determineTTLFromFamily(column);
     ScanInfo scanInfo = new ScanInfo(conf, column, ttl, 0, 
CellComparator.COMPARATOR);
-    return new StoreScanner(scan, scanInfo, scanType, null, scanners, 0L,
-      HConstants.LATEST_TIMESTAMP);
+    return new StoreScanner(scanInfo, scanType, scanners);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index fb837e8..f38ffb5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
index 8f481e0..5386c7d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
@@ -19,16 +19,16 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.OptionalInt;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
  * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
@@ -106,23 +106,15 @@ public class MemStoreCompactorSegmentsIterator extends 
MemStoreSegmentsIterator
 
   /**
    * Creates the scanner for compacting the pipeline.
-   *
    * @return the scanner
    */
   private StoreScanner createScanner(Store store, List<KeyValueScanner> 
scanners)
       throws IOException {
-
-    Scan scan = new Scan();
-    scan.setMaxVersions();  //Get all available versions
-    StoreScanner internalScanner =
-        new StoreScanner(store, store.getScanInfo(), scan, scanners,
-            ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
-            HConstants.OLDEST_TIMESTAMP);
-
-    return internalScanner;
+    // Get all available versions
+    return new StoreScanner(store, store.getScanInfo(), 
OptionalInt.of(Integer.MAX_VALUE), scanners,
+        ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), 
HConstants.OLDEST_TIMESTAMP);
   }
 
-
   /* Refill kev-value set (should be invoked only when KVS is empty)
    * Returns true if KVS is non-empty */
   private boolean refillKVS() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
index 07f98ad..86b9ea9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java
@@ -53,11 +53,9 @@ class ReversedStoreScanner extends StoreScanner implements 
KeyValueScanner {
   }
 
   /** Constructor for testing. */
-  ReversedStoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
-      final NavigableSet<byte[]> columns, final List<? extends 
KeyValueScanner> scanners)
-      throws IOException {
-    super(scan, scanInfo, scanType, columns, scanners,
-        HConstants.LATEST_TIMESTAMP);
+  ReversedStoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> 
columns,
+      List<? extends KeyValueScanner> scanners) throws IOException {
+    super(scan, scanInfo, columns, scanners);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index 298f3d4..d29d48f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.OptionalInt;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -30,7 +30,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
@@ -86,11 +85,8 @@ abstract class StoreFlusher {
           smallestReadPoint);
     }
     if (scanner == null) {
-      Scan scan = new Scan();
-      scan.setMaxVersions(store.getScanInfo().getMaxVersions());
-      scanner = new StoreScanner(store, store.getScanInfo(), scan,
-          snapshotScanners, ScanType.COMPACT_RETAIN_DELETES,
-          smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
+      scanner = new StoreScanner(store, store.getScanInfo(), 
OptionalInt.empty(), snapshotScanners,
+          ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, 
HConstants.OLDEST_TIMESTAMP);
     }
     assert scanner != null;
     if (store.getCoprocessorHost() != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index a220f54..9a096f6 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -24,6 +24,8 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -45,7 +47,6 @@ import 
org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
 import 
org.apache.hadoop.hbase.regionserver.querymatcher.CompactionScanQueryMatcher;
-import 
org.apache.hadoop.hbase.regionserver.querymatcher.LegacyScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
 import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
 import org.apache.hadoop.hbase.util.CollectionUtils;
@@ -66,7 +67,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
   private static final Log LOG = LogFactory.getLog(StoreScanner.class);
   // In unit tests, the store could be null
-  protected final Store store;
+  protected final Optional<Store> store;
   private ScanQueryMatcher matcher;
   protected KeyValueHeap heap;
   private boolean cacheBlocks;
@@ -166,14 +167,13 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
   }
 
   /** An internal constructor. */
-  protected StoreScanner(Store store, Scan scan, final ScanInfo scanInfo,
-      final NavigableSet<byte[]> columns, long readPt, boolean cacheBlocks, 
ScanType scanType) {
+  private StoreScanner(Optional<Store> store, Scan scan, ScanInfo scanInfo,
+      int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) {
     this.readPt = readPt;
     this.store = store;
     this.cacheBlocks = cacheBlocks;
     get = scan.isGetScan();
-    int numCol = columns == null ? 0 : columns.size();
-    explicitColumnQuery = numCol > 0;
+    explicitColumnQuery = numColumns > 0;
     this.scan = scan;
     this.now = EnvironmentEdgeManager.currentTime();
     this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
@@ -183,13 +183,12 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
      // the seek operation. However, we also look the row-column Bloom filter
      // for multi-row (non-"get") scans because this is not done in
      // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
-     this.useRowColBloom = numCol > 1 || (!get && numCol == 1);
-
+     this.useRowColBloom = numColumns > 1 || (!get && numColumns == 1);
      this.maxRowSize = scanInfo.getTableMaxRowSize();
     if (get) {
       this.readType = Scan.ReadType.PREAD;
       this.scanUsePread = true;
-    } else if(scanType != scanType.USER_SCAN) {
+    } else if(scanType != ScanType.USER_SCAN) {
       // For compaction scanners never use Pread as already we have stream 
based scanners on the
       // store files to be compacted
       this.readType = Scan.ReadType.STREAM;
@@ -207,13 +206,15 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     this.preadMaxBytes = scanInfo.getPreadMaxBytes();
     this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
     // Parallel seeking is on if the config allows and more there is more than 
one store file.
-    if (this.store != null && this.store.getStorefilesCount() > 1) {
-      RegionServerServices rsService = ((HStore) 
store).getHRegion().getRegionServerServices();
-      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
-        this.parallelSeekEnabled = true;
-        this.executor = rsService.getExecutorService();
+    this.store.ifPresent(s -> {
+      if (s.getStorefilesCount() > 1) {
+        RegionServerServices rsService = ((HStore) 
s).getHRegion().getRegionServerServices();
+        if (rsService != null && scanInfo.isParallelSeekEnabled()) {
+          this.parallelSeekEnabled = true;
+          this.executor = rsService.getExecutorService();
+        }
       }
-    }
+    });
   }
 
   private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
@@ -229,21 +230,23 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
    * @param columns which columns we are scanning
    * @throws IOException
    */
-  public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final 
NavigableSet<byte[]> columns,
-      long readPt)
-  throws IOException {
-    this(store, scan, scanInfo, columns, readPt, scan.getCacheBlocks(), 
ScanType.USER_SCAN);
+  public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, 
NavigableSet<byte[]> columns,
+      long readPt) throws IOException {
+    this(Optional.of(store), scan, scanInfo, columns != null ? columns.size() 
: 0, readPt,
+        scan.getCacheBlocks(), ScanType.USER_SCAN);
     if (columns != null && scan.isRaw()) {
       throw new DoNotRetryIOException("Cannot specify any column for a raw 
scan");
     }
     matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, 
oldestUnexpiredTS, now,
       store.getCoprocessorHost());
 
-    this.store.addChangedReaderObserver(this);
+    store.addChangedReaderObserver(this);
 
     try {
       // Pass columns to try to filter out unnecessary StoreFiles.
-      List<KeyValueScanner> scanners = getScannersNoCompaction();
+      List<KeyValueScanner> scanners = selectScannersFrom(store,
+        store.getScanners(cacheBlocks, scanUsePread, false, matcher, 
scan.getStartRow(),
+          scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), 
this.readPt));
 
       // Seek all scanners to the start of the Row (or if the exact matching 
row
       // key does not exist, then to the start of the next matching Row).
@@ -263,66 +266,61 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     } catch (IOException e) {
       // remove us from the HStore#changedReaderObservers here or we'll have 
no chance to
       // and might cause memory leak
-      this.store.deleteChangedReaderObserver(this);
+      store.deleteChangedReaderObserver(this);
       throw e;
     }
   }
 
+  // a dummy scan instance for compaction.
+  private static final Scan SCAN_FOR_COMPACTION = new Scan();
+
   /**
-   * Used for compactions.<p>
-   *
+   * Used for compactions.
+   * <p>
    * Opens a scanner across specified StoreFiles.
    * @param store who we scan
-   * @param scan the spec
    * @param scanners ancillary scanners
-   * @param smallestReadPoint the readPoint that we should use for tracking
-   *          versions
+   * @param smallestReadPoint the readPoint that we should use for tracking 
versions
    */
-  public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
-      List<? extends KeyValueScanner> scanners, ScanType scanType,
-      long smallestReadPoint, long earliestPutTs) throws IOException {
-    this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, 
earliestPutTs, null, null);
+  public StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions,
+      List<? extends KeyValueScanner> scanners, ScanType scanType, long 
smallestReadPoint,
+      long earliestPutTs) throws IOException {
+    this(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, 
earliestPutTs, null,
+        null);
   }
 
   /**
-   * Used for compactions that drop deletes from a limited range of rows.<p>
-   *
+   * Used for compactions that drop deletes from a limited range of rows.
+   * <p>
    * Opens a scanner across specified StoreFiles.
    * @param store who we scan
-   * @param scan the spec
    * @param scanners ancillary scanners
    * @param smallestReadPoint the readPoint that we should use for tracking 
versions
    * @param dropDeletesFromRow The inclusive left bound of the range; can be 
EMPTY_START_ROW.
    * @param dropDeletesToRow The exclusive right bound of the range; can be 
EMPTY_END_ROW.
    */
-  public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+  public StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions,
       List<? extends KeyValueScanner> scanners, long smallestReadPoint, long 
earliestPutTs,
       byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
-    this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, 
smallestReadPoint,
+    this(store, scanInfo, maxVersions, scanners, 
ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
   }
 
-  private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+  private StoreScanner(Store store, ScanInfo scanInfo, OptionalInt maxVersions,
       List<? extends KeyValueScanner> scanners, ScanType scanType, long 
smallestReadPoint,
       long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) 
throws IOException {
-    this(store, scan, scanInfo, null,
-        ((HStore) 
store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, 
scanType);
-    if (scan.hasFilter() || (scan.getStartRow() != null && 
scan.getStartRow().length > 0)
-        || (scan.getStopRow() != null && scan.getStopRow().length > 0)
-        || !scan.getTimeRange().isAllTime()) {
-      // use legacy query matcher since we do not consider the scan object in 
our code. Only used to
-      // keep compatibility for coprocessor.
-      matcher = LegacyScanQueryMatcher.create(scan, scanInfo, null, scanType, 
smallestReadPoint,
-        earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, 
dropDeletesToRow,
-        store.getCoprocessorHost());
-    } else {
-      matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, 
smallestReadPoint,
-        earliestPutTs, oldestUnexpiredTS, now, dropDeletesFromRow, 
dropDeletesToRow,
-        store.getCoprocessorHost());
-    }
+    this(Optional.of(store),
+        maxVersions.isPresent() ? new 
Scan().readVersions(maxVersions.getAsInt())
+            : SCAN_FOR_COMPACTION,
+        scanInfo, 0, ((HStore) 
store).getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED),
+        false, scanType);
+    assert scanType != ScanType.USER_SCAN;
+    matcher =
+        CompactionScanQueryMatcher.create(scanInfo, scanType, 
smallestReadPoint, earliestPutTs,
+          oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, 
store.getCoprocessorHost());
 
     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
-    scanners = selectScannersFrom(scanners);
+    scanners = selectScannersFrom(store, scanners);
 
     // Seek all scanners to the initial key
     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
@@ -331,62 +329,46 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     resetKVHeap(scanners, store.getComparator());
   }
 
-  @VisibleForTesting
-  StoreScanner(final Scan scan, ScanInfo scanInfo,
-      ScanType scanType, final NavigableSet<byte[]> columns,
-      final List<? extends KeyValueScanner> scanners) throws IOException {
-    this(scan, scanInfo, scanType, columns, scanners,
-        HConstants.LATEST_TIMESTAMP,
-        // 0 is passed as readpoint because the test bypasses Store
-        0);
-  }
-
-  @VisibleForTesting
-  StoreScanner(final Scan scan, ScanInfo scanInfo,
-    ScanType scanType, final NavigableSet<byte[]> columns,
-    final List<? extends KeyValueScanner> scanners, long earliestPutTs)
-        throws IOException {
-    this(scan, scanInfo, scanType, columns, scanners, earliestPutTs,
-      // 0 is passed as readpoint because the test bypasses Store
-      0);
-  }
-
-  public StoreScanner(final Scan scan, ScanInfo scanInfo, ScanType scanType,
-      final NavigableSet<byte[]> columns, final List<? extends 
KeyValueScanner> scanners, long earliestPutTs,
-      long readPt) throws IOException {
-    this(null, scan, scanInfo, columns, readPt,
-        scanType == ScanType.USER_SCAN ? scan.getCacheBlocks() : false, 
scanType);
-    if (scanType == ScanType.USER_SCAN) {
-      this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, 
oldestUnexpiredTS, now,
-        null);
-    } else {
-      if (scan.hasFilter() || (scan.getStartRow() != null && 
scan.getStartRow().length > 0)
-          || (scan.getStopRow() != null && scan.getStopRow().length > 0)
-          || !scan.getTimeRange().isAllTime() || columns != null) {
-        // use legacy query matcher since we do not consider the scan object 
in our code. Only used
-        // to keep compatibility for coprocessor.
-        matcher = LegacyScanQueryMatcher.create(scan, scanInfo, columns, 
scanType, Long.MAX_VALUE,
-          earliestPutTs, oldestUnexpiredTS, now, null, null, 
store.getCoprocessorHost());
-      } else {
-        this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, 
Long.MAX_VALUE,
-          earliestPutTs, oldestUnexpiredTS, now, null, null, null);
-      }
-    }
-
+  private void seekAllScanner(ScanInfo scanInfo, List<? extends 
KeyValueScanner> scanners)
+      throws IOException {
     // Seek all scanners to the initial key
     seekScanners(scanners, matcher.getStartKey(), false, parallelSeekEnabled);
     addCurrentScanners(scanners);
     resetKVHeap(scanners, scanInfo.getComparator());
   }
 
-  /**
-   * Get a filtered list of scanners. Assumes we are not in a compaction.
-   * @return list of scanners to seek
-   */
-  private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
-    return selectScannersFrom(
-      store.getScanners(cacheBlocks, scanUsePread, false, matcher, 
scan.getStartRow(),
-        scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), 
this.readPt));
+  // For mob compaction only as we do not have a Store instance when doing mob 
compaction.
+  public StoreScanner(ScanInfo scanInfo, ScanType scanType,
+      List<? extends KeyValueScanner> scanners) throws IOException {
+    this(Optional.empty(), SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, 
false, scanType);
+    assert scanType != ScanType.USER_SCAN;
+    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, 
Long.MAX_VALUE, 0L,
+      oldestUnexpiredTS, now, null, null, null);
+    seekAllScanner(scanInfo, scanners);
+  }
+
+  // Used to instantiate a scanner for user scan in test
+  @VisibleForTesting
+  StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
+      List<? extends KeyValueScanner> scanners) throws IOException {
+    // 0 is passed as readpoint because the test bypasses Store
+    this(Optional.empty(), scan, scanInfo, columns != null ? columns.size() : 
0, 0L,
+        scan.getCacheBlocks(), ScanType.USER_SCAN);
+    this.matcher =
+        UserScanQueryMatcher.create(scan, scanInfo, columns, 
oldestUnexpiredTS, now, null);
+    seekAllScanner(scanInfo, scanners);
+  }
+
+  // Used to instantiate a scanner for compaction in test
+  @VisibleForTesting
+  StoreScanner(ScanInfo scanInfo, OptionalInt maxVersions, ScanType scanType,
+      List<? extends KeyValueScanner> scanners) throws IOException {
+    // 0 is passed as readpoint because the test bypasses Store
+    this(Optional.empty(), maxVersions.isPresent() ? new 
Scan().readVersions(maxVersions.getAsInt())
+        : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType);
+    this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, 
Long.MAX_VALUE,
+      HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
+    seekAllScanner(scanInfo, scanners);
   }
 
   @VisibleForTesting
@@ -439,18 +421,17 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
   }
 
   /**
-   * Filters the given list of scanners using Bloom filter, time range, and
-   * TTL.
+   * Filters the given list of scanners using Bloom filter, time range, and 
TTL.
    * <p>
    * Will be overridden by testcase so declared as protected.
    */
   @VisibleForTesting
-  protected List<KeyValueScanner> selectScannersFrom(
-      final List<? extends KeyValueScanner> allScanners) {
+  protected List<KeyValueScanner> selectScannersFrom(Store store,
+      List<? extends KeyValueScanner> allScanners) {
     boolean memOnly;
     boolean filesOnly;
     if (scan instanceof InternalScan) {
-      InternalScan iscan = (InternalScan)scan;
+      InternalScan iscan = (InternalScan) scan;
       memOnly = iscan.isCheckOnlyMemStore();
       filesOnly = iscan.isCheckOnlyStoreFiles();
     } else {
@@ -462,7 +443,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
 
     // We can only exclude store files based on TTL if minVersions is set to 0.
     // Otherwise, we might have to return KVs that have technically expired.
-    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS: 
Long.MIN_VALUE;
+    long expiredTimestampCutoff = minVersions == 0 ? oldestUnexpiredTS : 
Long.MIN_VALUE;
 
     // include only those scan files which pass all filters
     for (KeyValueScanner kvs : allScanners) {
@@ -503,10 +484,8 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     if (withDelayedScannersClose) {
       this.closing = true;
     }
-    // Under test, we dont have a this.store
-    if (this.store != null) {
-      this.store.deleteChangedReaderObserver(this);
-    }
+    // For mob compaction, we do not have a store.
+    this.store.ifPresent(s -> s.deleteChangedReaderObserver(this));
     if (withDelayedScannersClose) {
       clearAndClose(scannersForDelayedClose);
       clearAndClose(memStoreScannersAfterFlush);
@@ -583,7 +562,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     }
 
     // Only do a sanity-check if store and comparator are available.
-    CellComparator comparator = store != null ? store.getComparator() : null;
+    CellComparator comparator = store.map(s -> s.getComparator()).orElse(null);
 
     int count = 0;
     long totalBytesRead = 0;
@@ -895,6 +874,8 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
    * @return if top of heap has changed (and KeyValueHeap has to try the next 
KV)
    */
   protected final boolean reopenAfterFlush() throws IOException {
+    // here we can make sure that we have a Store instance.
+    Store store = this.store.get();
     Cell lastTop = heap.peek();
     // When we have the scan object, should we not pass it to getScanners() to 
get a limited set of
     // scanners? We did so in the constructor and we could have done it now by 
storing the scan
@@ -906,7 +887,7 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
       allScanners.addAll(store.getScanners(flushedStoreFiles, cacheBlocks, get,
         scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), 
this.readPt, false));
       allScanners.addAll(memStoreScannersAfterFlush);
-      scanners = selectScannersFrom(allScanners);
+      scanners = selectScannersFrom(store, allScanners);
       // Clear the current set of flushed store files so that they don't get 
added again
       flushedStoreFiles.clear();
       memStoreScannersAfterFlush.clear();
@@ -998,8 +979,8 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
 
   @VisibleForTesting
   void trySwitchToStreamRead() {
-    if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || 
heap.peek() == null ||
-        bytesRead < preadMaxBytes) {
+    if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing ||
+        heap.peek() == null || bytesRead < preadMaxBytes) {
       return;
     }
     if (LOG.isDebugEnabled()) {
@@ -1021,6 +1002,8 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     List<KeyValueScanner> fileScanners = null;
     List<KeyValueScanner> newCurrentScanners;
     KeyValueHeap newHeap;
+    // We must have a store instance here
+    Store store = this.store.get();
     try {
       // recreate the scanners on the current file scanners
       fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, 
false, false,

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index d43a75b..15da298 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalInt;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,7 +34,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
@@ -499,10 +499,8 @@ public abstract class Compactor<T extends CellSink> {
    */
   protected InternalScanner createScanner(Store store, List<StoreFileScanner> 
scanners,
       ScanType scanType, long smallestReadPoint, long earliestPutTs) throws 
IOException {
-    Scan scan = new Scan();
-    scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions());
-    return new StoreScanner(store, store.getScanInfo(), scan, scanners,
-        scanType, smallestReadPoint, earliestPutTs);
+    return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), 
scanners, scanType,
+        smallestReadPoint, earliestPutTs);
   }
 
   /**
@@ -515,11 +513,9 @@ public abstract class Compactor<T extends CellSink> {
    * @return A compaction scanner.
    */
   protected InternalScanner createScanner(Store store, List<StoreFileScanner> 
scanners,
-     long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
-     byte[] dropDeletesToRow) throws IOException {
-    Scan scan = new Scan();
-    scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions());
-    return new StoreScanner(store, store.getScanInfo(), scan, scanners, 
smallestReadPoint,
-        earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
+      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
+      byte[] dropDeletesToRow) throws IOException {
+    return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), 
scanners,
+        smallestReadPoint, earliestPutTs, dropDeletesFromRow, 
dropDeletesToRow);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java
deleted file mode 100644
index 07fcb08..0000000
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/LegacyScanQueryMatcher.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.querymatcher;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.NavigableSet;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeepDeletedCells;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
-import org.apache.hadoop.hbase.io.TimeRange;
-import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
-import org.apache.hadoop.hbase.regionserver.ScanInfo;
-import org.apache.hadoop.hbase.regionserver.ScanType;
-import 
org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker.DeleteResult;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-
-/**
- * The old query matcher implementation. Used to keep compatibility for 
coprocessor that could
- * overwrite the StoreScanner before compaction. Should be removed once we 
find a better way to do
- * filtering during compaction.
- */
-@Deprecated
-@InterfaceAudience.Private
-public class LegacyScanQueryMatcher extends ScanQueryMatcher {
-
-  private final TimeRange tr;
-
-  private final Filter filter;
-
-  /** Keeps track of deletes */
-  private final DeleteTracker deletes;
-
-  /**
-   * The following three booleans define how we deal with deletes. There are 
three different
-   * aspects:
-   * <ol>
-   * <li>Whether to keep delete markers. This is used in compactions. Minor 
compactions always keep
-   * delete markers.</li>
-   * <li>Whether to keep deleted rows. This is also used in compactions, if 
the store is set to keep
-   * deleted rows. This implies keeping the delete markers as well.</li> In 
this case deleted rows
-   * are subject to the normal max version and TTL/min version rules just like 
"normal" rows.
-   * <li>Whether a scan can do time travel queries even before deleted marker 
to reach deleted
-   * rows.</li>
-   * </ol>
-   */
-  /** whether to retain delete markers */
-  private boolean retainDeletesInOutput;
-
-  /** whether to return deleted rows */
-  private final KeepDeletedCells keepDeletedCells;
-
-  // By default, when hbase.hstore.time.to.purge.deletes is 0ms, a delete
-  // marker is always removed during a major compaction. If set to non-zero
-  // value then major compaction will try to keep a delete marker around for
-  // the given number of milliseconds. We want to keep the delete markers
-  // around a bit longer because old puts might appear out-of-order. For
-  // example, during log replication between two clusters.
-  //
-  // If the delete marker has lived longer than its column-family's TTL then
-  // the delete marker will be removed even if time.to.purge.deletes has not
-  // passed. This is because all the Puts that this delete marker can influence
-  // would have also expired. (Removing of delete markers on col family TTL 
will
-  // not happen if min-versions is set to non-zero)
-  //
-  // But, if time.to.purge.deletes has not expired then a delete
-  // marker will not be removed just because there are no Puts that it is
-  // currently influencing. This is because Puts, that this delete can
-  // influence. may appear out of order.
-  private final long timeToPurgeDeletes;
-
-  /**
-   * This variable shows whether there is an null column in the query. There 
always exists a null
-   * column in the wildcard column query. There maybe exists a null column in 
the explicit column
-   * query based on the first column.
-   */
-  private final boolean hasNullColumn;
-
-  /** readPoint over which the KVs are unconditionally included */
-  private final long maxReadPointToTrackVersions;
-
-  /**
-   * Oldest put in any of the involved store files Used to decide whether it 
is ok to delete family
-   * delete marker of this store keeps deleted KVs.
-   */
-  protected final long earliestPutTs;
-
-  private final byte[] stopRow;
-
-  private byte[] dropDeletesFromRow = null, dropDeletesToRow = null;
-
-  private LegacyScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker 
columns,
-      boolean hasNullColumn, DeleteTracker deletes, ScanType scanType, long 
readPointToUse,
-      long earliestPutTs, long oldestUnexpiredTS, long now) {
-    super(createStartKeyFromRow(scan.getStartRow(), scanInfo), scanInfo, 
columns, oldestUnexpiredTS,
-        now);
-    TimeRange timeRange = 
scan.getColumnFamilyTimeRange().get(scanInfo.getFamily());
-    if (timeRange == null) {
-      this.tr = scan.getTimeRange();
-    } else {
-      this.tr = timeRange;
-    }
-    this.hasNullColumn = hasNullColumn;
-    this.deletes = deletes;
-    this.filter = scan.getFilter();
-    this.maxReadPointToTrackVersions = readPointToUse;
-    this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
-    this.earliestPutTs = earliestPutTs;
-
-    /* how to deal with deletes */
-    this.keepDeletedCells = scanInfo.getKeepDeletedCells();
-    this.retainDeletesInOutput = scanType == ScanType.COMPACT_RETAIN_DELETES;
-    this.stopRow = scan.getStopRow();
-  }
-
-  private LegacyScanQueryMatcher(Scan scan, ScanInfo scanInfo, ColumnTracker 
columns,
-      boolean hasNullColumn, DeleteTracker deletes, ScanType scanType, long 
readPointToUse,
-      long earliestPutTs, long oldestUnexpiredTS, long now, byte[] 
dropDeletesFromRow,
-      byte[] dropDeletesToRow) {
-    this(scan, scanInfo, columns, hasNullColumn, deletes, scanType, 
readPointToUse, earliestPutTs,
-        oldestUnexpiredTS, now);
-    this.dropDeletesFromRow = Preconditions.checkNotNull(dropDeletesFromRow);
-    this.dropDeletesToRow = Preconditions.checkNotNull(dropDeletesToRow);
-  }
-
-  @Override
-  public void beforeShipped() throws IOException {
-    super.beforeShipped();
-    deletes.beforeShipped();
-  }
-
-  @Override
-  public MatchCode match(Cell cell) throws IOException {
-    if (filter != null && filter.filterAllRemaining()) {
-      return MatchCode.DONE_SCAN;
-    }
-    MatchCode returnCode = preCheck(cell);
-    if (returnCode != null) {
-      return returnCode;
-    }
-    /*
-     * The delete logic is pretty complicated now.
-     * This is corroborated by the following:
-     * 1. The store might be instructed to keep deleted rows around.
-     * 2. A scan can optionally see past a delete marker now.
-     * 3. If deleted rows are kept, we have to find out when we can
-     *    remove the delete markers.
-     * 4. Family delete markers are always first (regardless of their TS)
-     * 5. Delete markers should not be counted as version
-     * 6. Delete markers affect puts of the *same* TS
-     * 7. Delete marker need to be version counted together with puts
-     *    they affect
-     */
-    long timestamp = cell.getTimestamp();
-    byte typeByte = cell.getTypeByte();
-    long mvccVersion = cell.getSequenceId();
-    if (CellUtil.isDelete(typeByte)) {
-      if (keepDeletedCells == KeepDeletedCells.FALSE
-          || (keepDeletedCells == KeepDeletedCells.TTL && timestamp < 
oldestUnexpiredTS)) {
-        // first ignore delete markers if the scanner can do so, and the
-        // range does not include the marker
-        //
-        // during flushes and compactions also ignore delete markers newer
-        // than the readpoint of any open scanner, this prevents deleted
-        // rows that could still be seen by a scanner from being collected
-        boolean includeDeleteMarker = tr.withinOrAfterTimeRange(timestamp);
-        if (includeDeleteMarker && mvccVersion <= maxReadPointToTrackVersions) 
{
-          this.deletes.add(cell);
-        }
-        // Can't early out now, because DelFam come before any other keys
-      }
-
-      if (timeToPurgeDeletes > 0
-          && (EnvironmentEdgeManager.currentTime() - timestamp) <= 
timeToPurgeDeletes) {
-        return MatchCode.INCLUDE;
-      } else if (retainDeletesInOutput || mvccVersion > 
maxReadPointToTrackVersions) {
-        // always include or it is not time yet to check whether it is OK
-        // to purge deltes or not
-        // if this is not a user scan (compaction), we can filter this 
deletemarker right here
-        // otherwise (i.e. a "raw" scan) we fall through to normal version and 
timerange checking
-        return MatchCode.INCLUDE;
-      } else if (keepDeletedCells == KeepDeletedCells.TRUE
-          || (keepDeletedCells == KeepDeletedCells.TTL && timestamp >= 
oldestUnexpiredTS)) {
-        if (timestamp < earliestPutTs) {
-          // keeping delete rows, but there are no puts older than
-          // this delete in the store files.
-          return columns.getNextRowOrNextColumn(cell);
-        }
-        // else: fall through and do version counting on the
-        // delete markers
-      } else {
-        return MatchCode.SKIP;
-      }
-      // note the following next else if...
-      // delete marker are not subject to other delete markers
-    } else if (!this.deletes.isEmpty()) {
-      DeleteResult deleteResult = deletes.isDeleted(cell);
-      switch (deleteResult) {
-        case FAMILY_DELETED:
-        case COLUMN_DELETED:
-          return columns.getNextRowOrNextColumn(cell);
-        case VERSION_DELETED:
-        case FAMILY_VERSION_DELETED:
-          return MatchCode.SKIP;
-        case NOT_DELETED:
-          break;
-        default:
-          throw new RuntimeException("UNEXPECTED");
-        }
-    }
-
-    int timestampComparison = tr.compare(timestamp);
-    if (timestampComparison >= 1) {
-      return MatchCode.SKIP;
-    } else if (timestampComparison <= -1) {
-      return columns.getNextRowOrNextColumn(cell);
-    }
-
-    // STEP 1: Check if the column is part of the requested columns
-    MatchCode colChecker = columns.checkColumn(cell, typeByte);
-    if (colChecker == MatchCode.INCLUDE) {
-      ReturnCode filterResponse = ReturnCode.SKIP;
-      // STEP 2: Yes, the column is part of the requested columns. Check if 
filter is present
-      if (filter != null) {
-        // STEP 3: Filter the key value and return if it filters out
-        filterResponse = filter.filterKeyValue(cell);
-        switch (filterResponse) {
-        case SKIP:
-          return MatchCode.SKIP;
-        case NEXT_COL:
-          return columns.getNextRowOrNextColumn(cell);
-        case NEXT_ROW:
-          return MatchCode.SEEK_NEXT_ROW;
-        case SEEK_NEXT_USING_HINT:
-          return MatchCode.SEEK_NEXT_USING_HINT;
-        default:
-          //It means it is either include or include and seek next
-          break;
-        }
-      }
-      /*
-       * STEP 4: Reaching this step means the column is part of the requested 
columns and either
-       * the filter is null or the filter has returned INCLUDE or 
INCLUDE_AND_NEXT_COL response.
-       * Now check the number of versions needed. This method call returns 
SKIP, INCLUDE,
-       * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL.
-       *
-       * FilterResponse            ColumnChecker               Desired behavior
-       * INCLUDE                   SKIP                        row has already 
been included, SKIP.
-       * INCLUDE                   INCLUDE                     INCLUDE
-       * INCLUDE                   INCLUDE_AND_SEEK_NEXT_COL   
INCLUDE_AND_SEEK_NEXT_COL
-       * INCLUDE                   INCLUDE_AND_SEEK_NEXT_ROW   
INCLUDE_AND_SEEK_NEXT_ROW
-       * INCLUDE_AND_SEEK_NEXT_COL SKIP                        row has already 
been included, SKIP.
-       * INCLUDE_AND_SEEK_NEXT_COL INCLUDE                     
INCLUDE_AND_SEEK_NEXT_COL
-       * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL   
INCLUDE_AND_SEEK_NEXT_COL
-       * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW   
INCLUDE_AND_SEEK_NEXT_ROW
-       *
-       * In all the above scenarios, we return the column checker return value 
except for
-       * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
-       */
-      colChecker = columns.checkVersions(cell, timestamp, typeByte,
-          mvccVersion > maxReadPointToTrackVersions);
-      if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) {
-        if (colChecker != MatchCode.SKIP) {
-          return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
-        }
-        return MatchCode.SEEK_NEXT_ROW;
-      }
-      return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL &&
-          colChecker == MatchCode.INCLUDE) ? 
MatchCode.INCLUDE_AND_SEEK_NEXT_COL
-          : colChecker;
-    }
-    return colChecker;
-  }
-
-  @Override
-  public boolean hasNullColumnInQuery() {
-    return hasNullColumn;
-  }
-
-  /**
-   * Handle partial-drop-deletes. As we match keys in order, when we have a 
range from which we can
-   * drop deletes, we can set retainDeletesInOutput to false for the duration 
of this range only,
-   * and maintain consistency.
-   */
-  private void checkPartialDropDeleteRange(Cell curCell) {
-    // If partial-drop-deletes are used, initially, dropDeletesFromRow and 
dropDeletesToRow
-    // are both set, and the matcher is set to retain deletes. We assume 
ordered keys. When
-    // dropDeletesFromRow is leq current kv, we start dropping deletes and 
reset
-    // dropDeletesFromRow; thus the 2nd "if" starts to apply.
-    if ((dropDeletesFromRow != null)
-        && (Arrays.equals(dropDeletesFromRow, HConstants.EMPTY_START_ROW)
-            || (CellComparator.COMPARATOR.compareRows(curCell, 
dropDeletesFromRow, 0,
-              dropDeletesFromRow.length) >= 0))) {
-      retainDeletesInOutput = false;
-      dropDeletesFromRow = null;
-    }
-    // If dropDeletesFromRow is null and dropDeletesToRow is set, we are 
inside the partial-
-    // drop-deletes range. When dropDeletesToRow is leq current kv, we stop 
dropping deletes,
-    // and reset dropDeletesToRow so that we don't do any more compares.
-    if ((dropDeletesFromRow == null) && (dropDeletesToRow != null)
-        && !Arrays.equals(dropDeletesToRow, HConstants.EMPTY_END_ROW) && 
(CellComparator.COMPARATOR
-            .compareRows(curCell, dropDeletesToRow, 0, 
dropDeletesToRow.length) >= 0)) {
-      retainDeletesInOutput = true;
-      dropDeletesToRow = null;
-    }
-  }
-
-  @Override
-  protected void reset() {
-    checkPartialDropDeleteRange(currentRow);
-  }
-
-  @Override
-  public boolean isUserScan() {
-    return false;
-  }
-
-  @Override
-  public boolean moreRowsMayExistAfter(Cell cell) {
-    if (this.stopRow == null || this.stopRow.length == 0) {
-      return true;
-    }
-    return rowComparator.compareRows(cell, stopRow, 0, stopRow.length) < 0;
-  }
-
-  @Override
-  public Filter getFilter() {
-    return filter;
-  }
-
-  @Override
-  public Cell getNextKeyHint(Cell cell) throws IOException {
-    if (filter == null) {
-      return null;
-    } else {
-      return filter.getNextCellHint(cell);
-    }
-  }
-
-  public static LegacyScanQueryMatcher create(Scan scan, ScanInfo scanInfo,
-      NavigableSet<byte[]> columns, ScanType scanType, long readPointToUse, 
long earliestPutTs,
-      long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow, byte[] 
dropDeletesToRow,
-      RegionCoprocessorHost regionCoprocessorHost) throws IOException {
-    boolean hasNullColumn =
-        !(columns != null && columns.size() != 0 && columns.first().length != 
0);
-    Pair<DeleteTracker, ColumnTracker> trackers = 
getTrackers(regionCoprocessorHost, null,
-        scanInfo, oldestUnexpiredTS, scan);
-    DeleteTracker deleteTracker = trackers.getFirst();
-    ColumnTracker columnTracker = trackers.getSecond();
-    if (dropDeletesFromRow == null) {
-      return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, 
hasNullColumn, deleteTracker,
-          scanType, readPointToUse, earliestPutTs, oldestUnexpiredTS, now);
-    } else {
-      return new LegacyScanQueryMatcher(scan, scanInfo, columnTracker, 
hasNullColumn, deleteTracker,
-          scanType, readPointToUse, earliestPutTs, oldestUnexpiredTS, now, 
dropDeletesFromRow,
-          dropDeletesToRow);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
index 62c8e7b..8dfc8aa 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.OptionalInt;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -269,19 +270,17 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
     private InternalScanner createCompactorScanner(Store store,
         List<? extends KeyValueScanner> scanners, ScanType scanType, long 
earliestPutTs)
         throws IOException {
-      Scan scan = new Scan();
-      scan.setMaxVersions(store.getColumnFamilyDescriptor().getMaxVersions());
-      return new CompactorStoreScanner(store, store.getScanInfo(), scan, 
scanners, scanType,
-          store.getSmallestReadPoint(), earliestPutTs);
+      return new CompactorStoreScanner(store, store.getScanInfo(), 
OptionalInt.empty(), scanners,
+          scanType, store.getSmallestReadPoint(), earliestPutTs);
     }
   }
 
   private static class CompactorStoreScanner extends StoreScanner {
 
-    public CompactorStoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+    public CompactorStoreScanner(Store store, ScanInfo scanInfo, OptionalInt 
maxVersions,
         List<? extends KeyValueScanner> scanners, ScanType scanType, long 
smallestReadPoint,
         long earliestPutTs) throws IOException {
-      super(store, scanInfo, scan, scanners, scanType, smallestReadPoint, 
earliestPutTs);
+      super(store, scanInfo, maxVersions, scanners, scanType, 
smallestReadPoint, earliestPutTs);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 48cb812..bae8d68 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -576,9 +576,9 @@ public class TestFromClientSide {
       }
 
       @Override
-      protected List<KeyValueScanner> selectScannersFrom(
+      protected List<KeyValueScanner> selectScannersFrom(Store store,
           List<? extends KeyValueScanner> allScanners) {
-        List<KeyValueScanner> scanners = super.selectScannersFrom(allScanners);
+        List<KeyValueScanner> scanners = super.selectScannersFrom(store, 
allScanners);
         List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
         for (KeyValueScanner scanner : scanners) {
           newScanners.add(new DelegatingKeyValueScanner(scanner) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
index 26cfed7..e9bf09b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
 import org.apache.hadoop.hbase.regionserver.RegionServerServices;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
@@ -116,18 +117,31 @@ public class TestRegionObserverScannerOpenHook {
     }
   }
 
+  private static final InternalScanner NO_DATA = new InternalScanner() {
+
+    @Override
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void close() throws IOException {}
+  };
   /**
    * Don't allow any data in a flush by creating a custom {@link StoreScanner}.
    */
   public static class NoDataFromFlush implements RegionObserver {
+
     @Override
     public InternalScanner 
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
         Store store, List<KeyValueScanner> scanners, InternalScanner s) throws 
IOException {
-      Scan scan = new Scan();
-      scan.setFilter(new NoDataFilter());
-      return new StoreScanner(store, store.getScanInfo(), scan,
-          scanners, ScanType.COMPACT_RETAIN_DELETES,
-          store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
+      scanners.forEach(KeyValueScanner::close);
+      return NO_DATA;
     }
   }
 
@@ -140,11 +154,8 @@ public class TestRegionObserverScannerOpenHook {
     public InternalScanner 
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
         Store store, List<? extends KeyValueScanner> scanners, ScanType 
scanType,
         long earliestPutTs, InternalScanner s) throws IOException {
-      Scan scan = new Scan();
-      scan.setFilter(new NoDataFilter());
-      return new StoreScanner(store, store.getScanInfo(), scan, scanners,
-          ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
-          HConstants.OLDEST_TIMESTAMP);
+      scanners.forEach(KeyValueScanner::close);
+      return NO_DATA;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
index 2fe8085..b8e1204 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestPartitionedMobCompactor.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy;
-import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
@@ -883,13 +882,10 @@ public class TestPartitionedMobCompactor {
     }
     List<KeyValueScanner> scanners = new 
ArrayList<>(StoreFileScanner.getScannersForStoreFiles(sfs,
       false, true, false, false, HConstants.LATEST_TIMESTAMP));
-    Scan scan = new Scan();
-    scan.setMaxVersions(hcd.getMaxVersions());
     long timeToPurgeDeletes = 
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
     long ttl = HStore.determineTTLFromFamily(hcd);
     ScanInfo scanInfo = new ScanInfo(conf, hcd, ttl, timeToPurgeDeletes, 
CellComparator.COMPARATOR);
-    StoreScanner scanner = new StoreScanner(scan, scanInfo, 
ScanType.COMPACT_RETAIN_DELETES, null,
-        scanners, 0L, HConstants.LATEST_TIMESTAMP);
+    StoreScanner scanner = new StoreScanner(scanInfo, 
ScanType.COMPACT_RETAIN_DELETES, scanners);
     List<Cell> results = new ArrayList<>();
     boolean hasMore = true;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
index b090cdd..5423578 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java
@@ -19,9 +19,9 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.OptionalInt;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
  * of functionality still behaves as expected.
  */
 public class NoOpScanPolicyObserver implements RegionObserver {
+
   /**
    * Reimplement the default behavior
    */
@@ -45,11 +46,9 @@ public class NoOpScanPolicyObserver implements 
RegionObserver {
   public InternalScanner preFlushScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c,
       Store store, List<KeyValueScanner> scanners, InternalScanner s) throws 
IOException {
     ScanInfo oldSI = store.getScanInfo();
-    ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), 
store.getColumnFamilyDescriptor(), oldSI.getTtl(),
-        oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
-    Scan scan = new Scan();
-    scan.setMaxVersions(oldSI.getMaxVersions());
-    return new StoreScanner(store, scanInfo, scan, scanners,
+    ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), 
store.getColumnFamilyDescriptor(),
+        oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+    return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners,
         ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), 
HConstants.OLDEST_TIMESTAMP);
   }
 
@@ -57,16 +56,15 @@ public class NoOpScanPolicyObserver implements 
RegionObserver {
    * Reimplement the default behavior
    */
   @Override
-  public InternalScanner preCompactScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c,
-      Store store, List<? extends KeyValueScanner> scanners, ScanType 
scanType, long earliestPutTs,
+  public InternalScanner preCompactScannerOpen(
+      final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      List<? extends KeyValueScanner> scanners, ScanType scanType, long 
earliestPutTs,
       InternalScanner s) throws IOException {
     // this demonstrates how to override the scanners default behavior
     ScanInfo oldSI = store.getScanInfo();
-    ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), 
store.getColumnFamilyDescriptor(), oldSI.getTtl(),
-        oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
-    Scan scan = new Scan();
-    scan.setMaxVersions(oldSI.getMaxVersions());
-    return new StoreScanner(store, scanInfo, scan, scanners, scanType, 
+    ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), 
store.getColumnFamilyDescriptor(),
+        oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
+    return new StoreScanner(store, scanInfo, OptionalInt.empty(), scanners, 
scanType,
         store.getSmallestReadPoint(), earliestPutTs);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
index 8118e41..dc3cf4d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java
@@ -194,25 +194,25 @@ public class TestCompactingMemStore extends 
TestDefaultMemStore {
     for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
       ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
           KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, 
this.memstore.getComparator(), false);
-      ScanType scanType = ScanType.USER_SCAN;
-      InternalScanner scanner = new StoreScanner(new Scan(
-          Bytes.toBytes(startRowId)), scanInfo, scanType, null,
-          memstore.getScanners(0));
-      List<Cell> results = new ArrayList<>();
-      for (int i = 0; scanner.next(results); i++) {
-        int rowId = startRowId + i;
-        Cell left = results.get(0);
-        byte[] row1 = Bytes.toBytes(rowId);
-        assertTrue("Row name",
+      try (InternalScanner scanner =
+          new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), 
scanInfo, null,
+              memstore.getScanners(0))) {
+        List<Cell> results = new ArrayList<>();
+        for (int i = 0; scanner.next(results); i++) {
+          int rowId = startRowId + i;
+          Cell left = results.get(0);
+          byte[] row1 = Bytes.toBytes(rowId);
+          assertTrue("Row name",
             CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) 
== 0);
-        assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
-        List<Cell> row = new ArrayList<>();
-        for (Cell kv : results) {
-          row.add(kv);
+          assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
+          List<Cell> row = new ArrayList<>();
+          for (Cell kv : results) {
+            row.add(kv);
+          }
+          isExpectedRowWithoutTimestamps(rowId, row);
+          // Clear out set. Otherwise row results accumulate.
+          results.clear();
         }
-        isExpectedRowWithoutTimestamps(rowId, row);
-        // Clear out set.  Otherwise row results accumulate.
-        results.clear();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 7b10846..b36b8fe 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -18,9 +18,17 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,22 +63,13 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertNotNull;
-
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 import org.junit.rules.TestRule;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 
 /** memstore test case */
 @Category({RegionServerTests.class, MediumTests.class})
@@ -164,10 +163,8 @@ public class TestDefaultMemStore {
     Configuration conf = HBaseConfiguration.create();
     ScanInfo scanInfo = new ScanInfo(conf, null, 0, 1, 
HConstants.LATEST_TIMESTAMP,
         KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, 
this.memstore.getComparator(), false);
-    ScanType scanType = ScanType.USER_SCAN;
-    StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, 
memstorescanners);
     int count = 0;
-    try {
+    try (StoreScanner s = new StoreScanner(scan, scanInfo, null, 
memstorescanners)) {
       while (s.next(result)) {
         LOG.info(result);
         count++;
@@ -175,8 +172,6 @@ public class TestDefaultMemStore {
         assertEquals(rowCount, result.size());
         result.clear();
       }
-    } finally {
-      s.close();
     }
     assertEquals(rowCount, count);
     for (KeyValueScanner scanner : memstorescanners) {
@@ -185,9 +180,8 @@ public class TestDefaultMemStore {
 
     memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
     // Now assert can count same number even if a snapshot mid-scan.
-    s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     count = 0;
-    try {
+    try (StoreScanner s = new StoreScanner(scan, scanInfo, null, 
memstorescanners)) {
       while (s.next(result)) {
         LOG.info(result);
         // Assert the stuff is coming out in right order.
@@ -201,8 +195,6 @@ public class TestDefaultMemStore {
         }
         result.clear();
       }
-    } finally {
-      s.close();
     }
     assertEquals(rowCount, count);
     for (KeyValueScanner scanner : memstorescanners) {
@@ -211,10 +203,9 @@ public class TestDefaultMemStore {
     memstorescanners = this.memstore.getScanners(mvcc.getReadPoint());
     // Assert that new values are seen in kvset as we scan.
     long ts = System.currentTimeMillis();
-    s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     count = 0;
     int snapshotIndex = 5;
-    try {
+    try (StoreScanner s = new StoreScanner(scan, scanInfo, null, 
memstorescanners)) {
       while (s.next(result)) {
         LOG.info(result);
         // Assert the stuff is coming out in right order.
@@ -225,14 +216,12 @@ public class TestDefaultMemStore {
         if (count == snapshotIndex) {
           MemStoreSnapshot snapshot = this.memstore.snapshot();
           this.memstore.clearSnapshot(snapshot.getId());
-          // Added more rows into kvset.  But the scanner wont see these rows.
+          // Added more rows into kvset. But the scanner wont see these rows.
           addRows(this.memstore, ts);
           LOG.info("Snapshotted, cleared it and then added values (which wont 
be seen)");
         }
         result.clear();
       }
-    } finally {
-      s.close();
     }
     assertEquals(rowCount, count);
   }
@@ -600,27 +589,26 @@ public class TestDefaultMemStore {
     //starting from each row, validate results should contain the starting row
     Configuration conf = HBaseConfiguration.create();
     for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
-      ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
-          KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, 
this.memstore.getComparator(), false);
-      ScanType scanType = ScanType.USER_SCAN;
-      try (InternalScanner scanner = new StoreScanner(new Scan(
-          Bytes.toBytes(startRowId)), scanInfo, scanType, null,
-          memstore.getScanners(0))) {
+      ScanInfo scanInfo =
+          new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, 
KeepDeletedCells.FALSE,
+              HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), 
false);
+      try (InternalScanner scanner =
+          new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), 
scanInfo, null,
+              memstore.getScanners(0))) {
         List<Cell> results = new ArrayList<>();
         for (int i = 0; scanner.next(results); i++) {
           int rowId = startRowId + i;
           Cell left = results.get(0);
           byte[] row1 = Bytes.toBytes(rowId);
-          assertTrue(
-              "Row name",
-              CellComparator.COMPARATOR.compareRows(left, row1, 0, 
row1.length) == 0);
+          assertTrue("Row name",
+            CellComparator.COMPARATOR.compareRows(left, row1, 0, row1.length) 
== 0);
           assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
           List<Cell> row = new ArrayList<>();
           for (Cell kv : results) {
             row.add(kv);
           }
           isExpectedRowWithoutTimestamps(rowId, row);
-          // Clear out set.  Otherwise row results accumulate.
+          // Clear out set. Otherwise row results accumulate.
           results.clear();
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
index 21089ed..9ab1440 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreCompaction.java
@@ -442,14 +442,11 @@ public class TestMobStoreCompaction {
 
       List<StoreFileScanner> scanners = 
StoreFileScanner.getScannersForStoreFiles(sfs, false, true, false, false,
           HConstants.LATEST_TIMESTAMP);
-      Scan scan = new Scan();
-      scan.setMaxVersions(hcd.getMaxVersions());
       long timeToPurgeDeletes = 
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
       long ttl = HStore.determineTTLFromFamily(hcd);
       ScanInfo scanInfo = new ScanInfo(copyOfConf, hcd, ttl, 
timeToPurgeDeletes,
         CellComparator.COMPARATOR);
-      StoreScanner scanner = new StoreScanner(scan, scanInfo, 
ScanType.COMPACT_DROP_DELETES, null,
-          scanners, 0L, HConstants.LATEST_TIMESTAMP);
+      StoreScanner scanner = new StoreScanner(scanInfo, 
ScanType.COMPACT_DROP_DELETES, scanners);
       try {
         size += UTIL.countRows(scanner);
       } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8d33949b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index 3782fdb..8b34a2f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -68,6 +66,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
+
 /**
  * Test cases against ReversibleKeyValueScanner
  */
@@ -263,7 +263,6 @@ public class TestReversibleScanners {
     StoreFile sf2 = new HStoreFile(fs, writer2.getPath(), 
TEST_UTIL.getConfiguration(), cacheConf,
         BloomType.NONE, true);
 
-    ScanType scanType = ScanType.USER_SCAN;
     ScanInfo scanInfo =
         new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, 
Integer.MAX_VALUE, Long.MAX_VALUE,
             KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, 
CellComparator.COMPARATOR, false);
@@ -271,16 +270,15 @@ public class TestReversibleScanners {
     // Case 1.Test a full reversed scan
     Scan scan = new Scan();
     scan.setReversed(true);
-    StoreScanner storeScanner = getReversibleStoreScanner(memstore, sf1, sf2,
-        scan, scanType, scanInfo, MAXMVCC);
+    StoreScanner storeScanner =
+        getReversibleStoreScanner(memstore, sf1, sf2, scan, scanInfo, MAXMVCC);
     verifyCountAndOrder(storeScanner, QUALSIZE * ROWSIZE, ROWSIZE, false);
 
     // Case 2.Test reversed scan with a specified start row
     int startRowNum = ROWSIZE / 2;
     byte[] startRow = ROWS[startRowNum];
     scan.withStartRow(startRow);
-    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
-        scanType, scanInfo, MAXMVCC);
+    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, 
scanInfo, MAXMVCC);
     verifyCountAndOrder(storeScanner, QUALSIZE * (startRowNum + 1),
         startRowNum + 1, false);
 
@@ -289,16 +287,14 @@ public class TestReversibleScanners {
     assertTrue(QUALSIZE > 2);
     scan.addColumn(FAMILYNAME, QUALS[0]);
     scan.addColumn(FAMILYNAME, QUALS[2]);
-    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
-        scanType, scanInfo, MAXMVCC);
+    storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, 
scanInfo, MAXMVCC);
     verifyCountAndOrder(storeScanner, 2 * (startRowNum + 1), startRowNum + 1,
         false);
 
     // Case 4.Test reversed scan with mvcc based on case 3
     for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
       LOG.info("Setting read point to " + readPoint);
-      storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan,
-          scanType, scanInfo, readPoint);
+      storeScanner = getReversibleStoreScanner(memstore, sf1, sf2, scan, 
scanInfo, readPoint);
       int expectedRowCount = 0;
       int expectedKVCount = 0;
       for (int i = startRowNum; i >= 0; i--) {
@@ -423,7 +419,7 @@ public class TestReversibleScanners {
   }
 
   private StoreScanner getReversibleStoreScanner(MemStore memstore,
-      StoreFile sf1, StoreFile sf2, Scan scan, ScanType scanType,
+      StoreFile sf1, StoreFile sf2, Scan scan,
       ScanInfo scanInfo, int readPoint) throws IOException {
     List<KeyValueScanner> scanners = getScanners(memstore, sf1, sf2, null,
         false, readPoint);
@@ -434,7 +430,7 @@ public class TestReversibleScanners {
       columns = entry.getValue();
     }
     StoreScanner storeScanner = new ReversedStoreScanner(scan, scanInfo,
-        scanType, columns, scanners);
+         columns, scanners);
     return storeScanner;
   }
 

Reply via email to