This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 97c63c1cae PHOENIX-7287 Leverage bloom filters for multi-key point
lookups (#1923)
97c63c1cae is described below
commit 97c63c1cae90a826dc44b23dbc53954961206ecc
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Tue Jul 9 18:04:47 2024 -0700
PHOENIX-7287 Leverage bloom filters for multi-key point lookups (#1923)
---
.../org/apache/phoenix/compile/ScanRanges.java | 2 +-
.../execute/PhoenixTxIndexMutationGenerator.java | 2 +-
.../org/apache/phoenix/filter/PagingFilter.java | 109 +++++----
.../org/apache/phoenix/filter/SkipScanFilter.java | 48 +++-
.../java/org/apache/phoenix/util/ScanUtil.java | 44 ++++
.../coprocessor/GlobalIndexRegionScanner.java | 4 +-
.../coprocessor/IndexRepairRegionScanner.java | 2 +-
.../phoenix/coprocessor/PagingRegionScanner.java | 260 ++++++++++++++++-----
.../coprocessor/UncoveredIndexRegionScanner.java | 2 +-
.../hbase/index/covered/data/CachedLocalTable.java | 2 +-
.../java/org/apache/phoenix/end2end/InListIT.java | 38 ++-
.../org/apache/phoenix/end2end/TableTTLIT.java | 3 +-
.../apache/phoenix/compile/WhereCompilerTest.java | 14 +-
.../filter/SkipScanFilterIntersectTest.java | 2 +-
.../apache/phoenix/filter/SkipScanFilterTest.java | 4 +-
.../phoenix/query/ParallelIteratorsSplitTest.java | 4 +-
16 files changed, 391 insertions(+), 149 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ScanRanges.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ScanRanges.java
index e9de7b75a6..1667bf1058 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ScanRanges.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/ScanRanges.java
@@ -198,7 +198,7 @@ public class ScanRanges {
ranges = ranges.subList(0, boundSlotCount);
slotSpan = Arrays.copyOf(slotSpan, boundSlotCount);
}
- this.filter = new SkipScanFilter(ranges, slotSpan, this.schema);
+ this.filter = new SkipScanFilter(ranges, slotSpan, this.schema,
isPointLookup);
}
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index 91f0c92f4c..0b4423ba0f 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -199,7 +199,7 @@ public class PhoenixTxIndexMutationGenerator {
// checkpointed versions.
SkipScanFilter filter = scanRanges.getSkipScanFilter();
if (isRollback) {
- filter = new SkipScanFilter(filter,true);
+ filter = new SkipScanFilter(filter,true, false);
indexMetaData.getTransactionContext().setVisibilityLevel(PhoenixVisibilityLevel.SNAPSHOT_ALL);
}
scan.setFilter(filter);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/PagingFilter.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/PagingFilter.java
index 988efddf6a..eb9eb3b426 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/PagingFilter.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/PagingFilter.java
@@ -35,27 +35,54 @@ import org.apache.hadoop.io.Writable;
import org.apache.phoenix.util.EnvironmentEdgeManager;
/**
- * This is a top level Phoenix filter which is injected to a scan at the
server side. If the scan has
- * already a filter then PagingFilter wraps it. This filter for server paging.
It makes sure that
- * the scan does not take more than pageSizeInMs.
+ * This is a top level Phoenix filter which is injected to a scan at the
server side. If the scan
+ * already has a filter then PagingFilter wraps it. This filter is for server
pagination. It makes
+ * sure that the scan does not take more than pageSizeInMs.
+ *
+ * PagingRegionScanner initializes PagingFilter before retrieving a row. The
state of PagingFilter
+ * consists of three variables startTime, isStopped, and currentCell. During
this
+ * initialization, starTime is set to the current time, isStopped to false,
and currentCell to null.
+ *
+ * PagingFilter implements the paging state machine in three filter methods
that are
+ * hasFilterRow(), filterAllRemaining(), and filterRowKey(). These methods are
called in the
+ * following order for each row: hasFilterRow(), filterAllRemaining(),
filterRowKey(), and
+ * filterAllRemaining(). Please note that filterAllRemaining() is called twice
(before and after
+ * filterRowKey()). Sometimes, filterAllRemaining() is called multiple times
back to back.
+ *
+ * In hasFilterRow(), if currentCell is not null, meaning that at least one
row has been
+ * scanned, and it is time to page out, then PagingFilter sets isStopped to
true.
+ *
+ * In filterAllRemaining(), PagingFilter returns true if isStopped is true.
Returning true from this
+ * method causes the HBase region scanner to signal the caller (that is
PagingRegionScanner in this
+ * case) that there are no more rows to scan by returning false from the
next() call. In that case,
+ * PagingRegionScanner checks if PagingFilter is stopped. If PagingFilter is
stopped, then it means
+ * the last next() call paged out rather than the scan operation reached at
its last row.
+ * Please note it is crucial that PagingFilter returns true in the first
filterAllRemaining() call
+ * for a given row. This allows to the HBase region scanner to resume the
scanning rows when the
+ * next() method is called even though the region scanner already signaled the
caller that there
+ * were no more rows to scan. PagingRegionScanner leverages this behavior to
resume the scan
+ * operation using the same scanner instead closing the current one and
starting a new scanner. If
+ * this specific HBase region scanner behavior changes, it will cause server
paging test failures.
+ * To fix them, the PagingRegionScanner code needs to change such that
PagingRegionScanner needs to
+ * create a new scanner with adjusted start row to resume the scan operation
after PagingFilter
+ * stops.
+ *
+ * If the scan operation has not been terminated by PageFilter, HBase
subsequently calls
+ * filterRowKey(). In this method, PagingFilter records the last row that is
scanned.
+ *
*/
public class PagingFilter extends FilterBase implements Writable {
- private enum State {
- INITIAL, STARTED, TIME_TO_STOP, STOPPED
- }
- State state;
private long pageSizeMs;
private long startTime;
// tracks the row we last visited
private Cell currentCell;
+ private boolean isStopped;
private Filter delegate = null;
public PagingFilter() {
- init();
}
public PagingFilter(Filter delegate, long pageSizeMs) {
- init();
this.delegate = delegate;
this.pageSizeMs = pageSizeMs;
}
@@ -77,38 +104,33 @@ public class PagingFilter extends FilterBase implements
Writable {
}
public boolean isStopped() {
- return state == State.STOPPED;
+ return isStopped;
}
public void init() {
- state = State.INITIAL;
+ isStopped = false;
currentCell = null;
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
}
@Override
- public void reset() throws IOException {
- long currentTime = EnvironmentEdgeManager.currentTimeMillis();
- // reset can be called multiple times for the same row sometimes even
before we have
- // scanned even one row. The order in which it is called is not very
predictable.
- // So we need to ensure that we have seen at least one row before we
page.
- // The currentCell != null check ensures that.
- if (state == State.STARTED && currentCell != null
- && currentTime - startTime >= pageSizeMs) {
- state = State.TIME_TO_STOP;
- }
- if (delegate != null) {
- delegate.reset();
- return;
+ public boolean hasFilterRow() {
+ if (currentCell != null
+ && EnvironmentEdgeManager.currentTimeMillis() - startTime >=
pageSizeMs) {
+ isStopped = true;
}
- super.reset();
+ return true;
}
@Override
- public Cell getNextCellHint(Cell currentKV) throws IOException {
+ public boolean filterAllRemaining() throws IOException {
+ if (isStopped) {
+ return true;
+ }
if (delegate != null) {
- return delegate.getNextCellHint(currentKV);
+ return delegate.filterAllRemaining();
}
- return super.getNextCellHint(currentKV);
+ return super.filterAllRemaining();
}
@Override
@@ -121,37 +143,24 @@ public class PagingFilter extends FilterBase implements
Writable {
}
@Override
- public boolean filterAllRemaining() throws IOException {
- if (state == State.TIME_TO_STOP) {
- state = State.STOPPED;
- return true;
- }
- if (state == State.STOPPED) {
- return true;
- }
+ public void reset() throws IOException {
if (delegate != null) {
- return delegate.filterAllRemaining();
+ delegate.reset();
+ return;
}
- return super.filterAllRemaining();
+ super.reset();
}
@Override
- /**
- * This is called once for every row in the beginning.
- */
- public boolean hasFilterRow() {
- if (state == State.INITIAL) {
- startTime = EnvironmentEdgeManager.currentTimeMillis();
- state = State.STARTED;
+ public Cell getNextCellHint(Cell currentKV) throws IOException {
+ if (delegate != null) {
+ return delegate.getNextCellHint(currentKV);
}
- return true;
+ return super.getNextCellHint(currentKV);
}
@Override
public boolean filterRow() throws IOException {
- if (state == State.TIME_TO_STOP) {
- return true;
- }
if (delegate != null) {
return delegate.filterRow();
}
@@ -201,7 +210,6 @@ public class PagingFilter extends FilterBase implements
Writable {
@Override
public ReturnCode filterKeyValue(Cell v) throws IOException {
-
if (delegate != null) {
return delegate.filterKeyValue(v);
}
@@ -210,7 +218,6 @@ public class PagingFilter extends FilterBase implements
Writable {
@Override
public Filter.ReturnCode filterCell(Cell c) throws IOException {
- currentCell = c;
if (delegate != null) {
return delegate.filterCell(c);
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
index 30728d366c..a9cb711653 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/SkipScanFilter.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -79,6 +80,7 @@ public class SkipScanFilter extends FilterBase implements
Writable {
private int endKeyLength;
private boolean isDone;
private int offset;
+ private boolean isMultiKeyPointLookup;
private Map<ImmutableBytesWritable, Cell> nextCellHintMap =
new HashMap<ImmutableBytesWritable, Cell>();
@@ -91,27 +93,42 @@ public class SkipScanFilter extends FilterBase implements
Writable {
public SkipScanFilter() {
}
- public SkipScanFilter(SkipScanFilter filter, boolean
includeMultipleVersions) {
- this(filter.slots, filter.slotSpan, filter.schema,
includeMultipleVersions);
+ public SkipScanFilter(SkipScanFilter filter, boolean
includeMultipleVersions,
+ boolean isMultiKeyPointLookup) {
+ this(filter.slots, filter.slotSpan, filter.schema,
includeMultipleVersions,
+ isMultiKeyPointLookup);
}
- public SkipScanFilter(List<List<KeyRange>> slots, RowKeySchema schema) {
- this(slots, ScanUtil.getDefaultSlotSpans(slots.size()), schema);
+ public SkipScanFilter(List<List<KeyRange>> slots, RowKeySchema schema,
boolean isMultiKeyPointLookup) {
+ this(slots, ScanUtil.getDefaultSlotSpans(slots.size()), schema,
isMultiKeyPointLookup);
}
- public SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan,
RowKeySchema schema) {
- this(slots, slotSpan, schema, false);
+ public SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan,
RowKeySchema schema,
+ boolean isMultiKeyPointLookup) {
+ this(slots, slotSpan, schema, false, isMultiKeyPointLookup);
}
- private SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan,
RowKeySchema schema, boolean includeMultipleVersions) {
- init(slots, slotSpan, schema, includeMultipleVersions);
+ private SkipScanFilter(List<List<KeyRange>> slots, int[] slotSpan,
RowKeySchema schema,
+ boolean includeMultipleVersions, boolean isMultiKeyPointLookup) {
+ init(slots, slotSpan, schema, includeMultipleVersions,
isMultiKeyPointLookup);
}
public void setOffset(int offset) {
this.offset = offset;
}
+ public int getOffset() {
+ return offset;
+ }
+ public boolean isMultiKeyPointLookup() {
+ return isMultiKeyPointLookup;
+ }
+
+ public List<KeyRange> getPointLookupKeyRanges() {
+ return isMultiKeyPointLookup ? slots.get(0) : Collections.emptyList();
+ }
- private void init(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema
schema, boolean includeMultipleVersions) {
+ private void init(List<List<KeyRange>> slots, int[] slotSpan, RowKeySchema
schema,
+ boolean includeMultipleVersions, boolean isPointLookup) {
for (List<KeyRange> ranges : slots) {
if (ranges.isEmpty()) {
throw new IllegalStateException();
@@ -126,6 +143,7 @@ public class SkipScanFilter extends FilterBase implements
Writable {
this.endKey = new byte[maxKeyLength];
this.endKeyLength = 0;
this.includeMultipleVersions = includeMultipleVersions;
+ this.isMultiKeyPointLookup = isPointLookup;
}
// Exposed for testing.
@@ -194,7 +212,7 @@ public class SkipScanFilter extends FilterBase implements
Writable {
public SkipScanFilter intersect(byte[] lowerInclusiveKey, byte[]
upperExclusiveKey) {
List<List<KeyRange>> newSlots =
Lists.newArrayListWithCapacity(slots.size());
if (intersect(lowerInclusiveKey, upperExclusiveKey, newSlots)) {
- return new SkipScanFilter(newSlots, slotSpan, schema);
+ return new SkipScanFilter(newSlots, slotSpan, schema,
isMultiKeyPointLookup);
}
return null;
}
@@ -618,7 +636,14 @@ public class SkipScanFilter extends FilterBase implements
Writable {
orClause.add(range);
}
}
- this.init(slots, slotSpan, schema, includeMultipleVersions);
+ try {
+ boolean isPointLookup = in.readBoolean();
+ this.init(slots, slotSpan, schema, includeMultipleVersions,
isPointLookup);
+ } catch (IOException e) {
+ // Reached the end of the stream before reading the boolean field.
The client can be
+ // an older client
+ this.init(slots, slotSpan, schema, includeMultipleVersions, false);
+ }
}
@Override
@@ -636,6 +661,7 @@ public class SkipScanFilter extends FilterBase implements
Writable {
range.write(out);
}
}
+ out.writeBoolean(isMultiKeyPointLookup);
}
@Override
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index c978acf6f6..13a8aeda8e 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1637,6 +1637,50 @@ public class ScanUtil {
return null;
}
+ public static SkipScanFilter removeSkipScanFilterFromFilterList(FilterList
filterList) {
+ Iterator<Filter> filterIterator = filterList.getFilters().iterator();
+ while (filterIterator.hasNext()) {
+ Filter filter = filterIterator.next();
+ if (filter instanceof SkipScanFilter
+ && ((SkipScanFilter) filter).isMultiKeyPointLookup()) {
+ filterIterator.remove();
+ return (SkipScanFilter) filter;
+ } else if (filter instanceof FilterList) {
+ SkipScanFilter skipScanFilter =
removeSkipScanFilterFromFilterList((FilterList) filter);
+ if (skipScanFilter != null) {
+ return skipScanFilter;
+ }
+ }
+ }
+ return null;
+ }
+ public static SkipScanFilter removeSkipScanFilter(Scan scan) {
+ Filter filter = scan.getFilter();
+ if (filter != null) {
+ PagingFilter pagingFilter = null;
+ if (filter instanceof PagingFilter) {
+ pagingFilter = (PagingFilter) filter;
+ filter = pagingFilter.getDelegateFilter();
+ if (filter == null) {
+ return null;
+ }
+ }
+ if (filter instanceof SkipScanFilter
+ && ((SkipScanFilter) filter).isMultiKeyPointLookup()) {
+ if (pagingFilter != null) {
+ pagingFilter.setDelegateFilter(null);
+ scan.setFilter(pagingFilter);
+ } else {
+ scan.setFilter(null);
+ }
+ return (SkipScanFilter) filter;
+ } else if (filter instanceof FilterList) {
+ return removeSkipScanFilterFromFilterList((FilterList) filter);
+ }
+ }
+ return null;
+ }
+
/**
* Verify whether the given row key is in the scan boundaries i.e. scan
start and end keys.
*
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 5af9950aa6..5706916cae 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -1099,7 +1099,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
indexScan.setTimeRange(scan.getTimeRange().getMin(),
scan.getTimeRange().getMax());
scanRanges.initializeScan(indexScan);
SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
- indexScan.setFilter(new SkipScanFilter(skipScanFilter, true));
+ indexScan.setFilter(new SkipScanFilter(skipScanFilter, true, true));
indexScan.setRaw(true);
indexScan.readAllVersions();
indexScan.setCacheBlocks(false);
@@ -1502,7 +1502,7 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
scanRanges.initializeScan(incrScan);
SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
- incrScan.setFilter(new SkipScanFilter(skipScanFilter, true));
+ incrScan.setFilter(new SkipScanFilter(skipScanFilter, true, true));
//putting back the min time to 0 for index and data reads
incrScan.setTimeRange(0, scan.getTimeRange().getMax());
scan.setTimeRange(0, scan.getTimeRange().getMax());
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index 74c8e2edf6..19707bd533 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -152,7 +152,7 @@ public class IndexRepairRegionScanner extends
GlobalIndexRegionScanner {
dataScan.setTimeRange(scan.getTimeRange().getMin(),
scan.getTimeRange().getMax());
scanRanges.initializeScan(dataScan);
SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
- dataScan.setFilter(new SkipScanFilter(skipScanFilter, true));
+ dataScan.setFilter(new SkipScanFilter(skipScanFilter, true, true));
dataScan.setRaw(true);
dataScan.readAllVersions();
dataScan.setCacheBlocks(false);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
index 965cd370c9..719ce43731 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
@@ -23,11 +23,16 @@ import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.filter.PagingFilter;
+import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,82 +44,225 @@ import org.slf4j.LoggerFactory;
* to be filtered out), PagingFilter stops the HBase region scanner and sets
its state
* to STOPPED. In this case, the HBase region scanner next() returns false and
* PagingFilter#isStopped() returns true. PagingRegionScanner is responsible
for detecting
- * PagingFilter has stopped the scanner, and then closing the current HBase
region scanner,
- * starting a new one to resume the scan operation and returning a dummy
result to signal to
+ * PagingFilter has stopped the scanner, and returning a dummy result to
signal to
* Phoenix client to resume the scan operation by skipping this dummy result
and calling
* ResultScanner#next().
+ *
+ * PagingRegionScanner also converts a multi-key point lookup scan into N
single point lookup
+ * scans to allow individual scan to leverage HBase bloom filter. This
conversion is done within
+ * the MultiKeyPointLookup inner class.
*/
public class PagingRegionScanner extends BaseRegionScanner {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PagingRegionScanner.class);
private Region region;
private Scan scan;
private PagingFilter pagingFilter;
+ private MultiKeyPointLookup multiKeyPointLookup = null;
+ private boolean initialized = false;
- private static final Logger LOGGER =
LoggerFactory.getLogger(PagingRegionScanner.class);
+ private class MultiKeyPointLookup {
+ private SkipScanFilter skipScanFilter;
+ private List<KeyRange> pointLookupRanges = null;
+ private int lookupPosition = 0;
+ private byte[] lookupKeyPrefix = null;
+ private long pageSizeMs;
- public PagingRegionScanner(Region region, RegionScanner scanner, Scan
scan) {
- super(scanner);
- this.region = region;
- this.scan = scan;
- pagingFilter = ScanUtil.getPhoenixPagingFilter(scan);
- if (pagingFilter != null) {
- pagingFilter.init();
+ private MultiKeyPointLookup(SkipScanFilter skipScanFilter) throws
IOException {
+ this.skipScanFilter = skipScanFilter;
+ pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
+ pointLookupRanges = skipScanFilter.getPointLookupKeyRanges();
+ lookupPosition = findLookupPosition(scan.getStartRow());
+ if (skipScanFilter.getOffset() > 0) {
+ lookupKeyPrefix = new byte[skipScanFilter.getOffset()];
+ System.arraycopy(scan.getStartRow(), 0, lookupKeyPrefix, 0,
+ skipScanFilter.getOffset());
+ }
+ // A point lookup scan does not need to have a paging filter
+ if (pagingFilter != null) {
+ scan.setFilter(pagingFilter.getDelegateFilter());
+ }
}
- }
- private boolean next(List<Cell> results, boolean raw) throws IOException {
- try {
- byte[] adjustedStartRowKey =
-
scan.getAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY);
- byte[] adjustedStartRowKeyIncludeBytes =
-
scan.getAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE);
- // If scanners at higher level needs to re-scan the data that were
already scanned
- // earlier, they can provide adjusted new start rowkey for the
scan and whether to
- // include it.
- // If they are set as the scan attributes, close the scanner,
reopen it with
- // updated start rowkey and whether to include it. Update mvcc
read point from the
- // previous scanner and set it back to the new scanner to maintain
the read
- // consistency for the given region.
- // Once done, continue the scan operation and reset the attributes.
- if (adjustedStartRowKey != null && adjustedStartRowKeyIncludeBytes
!= null) {
- long mvccReadPoint = delegate.getMvccReadPoint();
- delegate.close();
- scan.withStartRow(adjustedStartRowKey,
- Bytes.toBoolean(adjustedStartRowKeyIncludeBytes));
- PackagePrivateFieldAccessor.setMvccReadPoint(scan,
mvccReadPoint);
- delegate = region.getScanner(scan);
-
scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY, null);
-
scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE,
null);
+ private int findLookupPosition(byte[] startRowKey) {
+ for (int i = 0; i < pointLookupRanges.size(); i++) {
+ byte[] rowKey = pointLookupRanges.get(i).getLowerRange();
+ if (Bytes.compareTo(startRowKey, skipScanFilter.getOffset(),
+ startRowKey.length - skipScanFilter.getOffset(),
rowKey, 0,
+ rowKey.length) <= 0) {
+ return i;
+ }
}
- if (pagingFilter != null) {
- pagingFilter.init();
+ return pointLookupRanges.size();
+ }
+
+ private boolean verifyStartRowKey(byte[] startRowKey) {
+ // The startRowKey may not be one of the point lookup keys. This
happens when
+ // the region moves and the HBase client adjusts the scan start
row key.
+ lookupPosition = findLookupPosition(startRowKey);
+ if (lookupPosition == pointLookupRanges.size()) {
+ return false;
}
- boolean hasMore = raw ? delegate.nextRaw(results) :
delegate.next(results);
- if (pagingFilter == null) {
- return hasMore;
+ byte[] rowKey =
pointLookupRanges.get(lookupPosition++).getLowerRange();
+ scan.withStopRow(rowKey, true);
+ scan.withStopRow(rowKey, true);
+ return true;
+ }
+
+ private RegionScanner getNewScanner() throws IOException {
+ if (lookupPosition >= pointLookupRanges.size()) {
+ return null;
}
- if (!hasMore) {
- // There is no more row from the HBase region scanner. We need
to check if PageFilter
- // has stopped the region scanner
- if (pagingFilter.isStopped()) {
- if (results.isEmpty()) {
- byte[] rowKey =
pagingFilter.getCurrentRowKeyToBeExcluded();
- LOGGER.info("Page filter stopped, generating dummy key
{} ",
- Bytes.toStringBinary(rowKey));
+ byte[] rowKey =
pointLookupRanges.get(lookupPosition++).getLowerRange();
+ byte[] adjustedRowKey = rowKey;
+ if (lookupKeyPrefix != null) {
+ int len = rowKey.length + lookupKeyPrefix.length;
+ adjustedRowKey = new byte[len];
+ System.arraycopy(lookupKeyPrefix, 0, adjustedRowKey, 0,
+ lookupKeyPrefix.length);
+ System.arraycopy(rowKey, 0, adjustedRowKey,
lookupKeyPrefix.length,
+ rowKey.length);
+ }
+ scan.withStartRow(adjustedRowKey, true);
+ scan.withStopRow(adjustedRowKey, true);
+ return region.getScanner(scan);
+ }
+
+ private boolean hasMore() {
+ return lookupPosition < pointLookupRanges.size();
+ }
+ private boolean next(List<Cell> results, boolean raw, RegionScanner
scanner)
+ throws IOException {
+ try {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ while (true) {
+ if (raw ? scanner.nextRaw(results) :
scanner.next(results)) {
+ // Since each scan is supposed to return only one row
(even when the
+ // start and stop row key are not the same, which
happens after region
+ // moves or when there are delete markers in the
table), this should not
+ // happen
+ LOGGER.warn("Each scan is supposed to return only one
row, scan " + scan
+ + ", region " + region);
+ }
+ if (!results.isEmpty()) {
+ return hasMore();
+ }
+ // The scanner returned an empty result. This means that
one of the rows
+ // has been deleted.
+ if (!hasMore()) {
+ return false;
+ }
+
+ if (EnvironmentEdgeManager.currentTimeMillis() - startTime
> pageSizeMs) {
+ byte[] rowKey = pointLookupRanges.get(lookupPosition -
1).getLowerRange();
ScanUtil.getDummyResult(rowKey, results);
+ return true;
+ }
+
+ RegionScanner regionScanner = getNewScanner();
+ if (regionScanner == null) {
+ return false;
}
- return true;
+ scanner.close();
+ scanner = regionScanner;
}
+ } catch (Exception e) {
+ lookupPosition--;
+ throw e;
+ } finally {
+ scanner.close();
+ }
+ }
+ }
+
+ public PagingRegionScanner(Region region, RegionScanner scanner, Scan
scan) {
+ super(scanner);
+ this.region = region;
+ this.scan = scan;
+ pagingFilter = ScanUtil.getPhoenixPagingFilter(scan);
+ }
+
+ void init() throws IOException {
+ if (initialized) {
+ return;
+ }
+ TableDescriptor tableDescriptor = region.getTableDescriptor();
+ BloomType bloomFilterType =
tableDescriptor.getColumnFamilies()[0].getBloomFilterType();
+ if (bloomFilterType == BloomType.ROW) {
+ // Check if the scan is a multi-point-lookup scan if so remove it
from the scan
+ SkipScanFilter skipScanFilter =
ScanUtil.removeSkipScanFilter(scan);
+ if (skipScanFilter != null) {
+ multiKeyPointLookup = new MultiKeyPointLookup(skipScanFilter);
+ }
+ }
+ initialized = true;
+ }
+
+ private boolean next(List<Cell> results, boolean raw) throws IOException {
+ init();
+ if (pagingFilter != null) {
+ pagingFilter.init();
+ }
+ byte[] adjustedStartRowKey =
+
scan.getAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY);
+ byte[] adjustedStartRowKeyIncludeBytes =
+
scan.getAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE);
+ // If scanners at higher level needs to re-scan the data that were
already scanned
+ // earlier, they can provide adjusted new start row key for the scan
and whether to
+ // include it.
+ // If they are set as the scan attributes, close the scanner, reopen
it with
+ // updated start row key and whether to include it. Update mvcc read
point from the
+ // previous scanner and set it back to the new scanner to maintain the
read
+ // consistency for the given region.
+ // Once done, continue the scan operation and reset the attributes.
+ if (adjustedStartRowKey != null && adjustedStartRowKeyIncludeBytes !=
null) {
+ long mvccReadPoint = delegate.getMvccReadPoint();
+ delegate.close();
+ scan.withStartRow(adjustedStartRowKey,
+ Bytes.toBoolean(adjustedStartRowKeyIncludeBytes));
+ PackagePrivateFieldAccessor.setMvccReadPoint(scan, mvccReadPoint);
+ if (multiKeyPointLookup != null
+ &&
!multiKeyPointLookup.verifyStartRowKey(adjustedStartRowKey)) {
return false;
- } else {
- // We got a row from the HBase scanner within the configured
time (i.e., the page size). We need to
- // start a new page on the next next() call.
- return true;
}
- } catch (Exception e) {
- if (pagingFilter != null) {
- pagingFilter.init();
+ delegate = region.getScanner(scan);
+
scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY, null);
+
scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE,
null);
+
+ } else {
+ if (multiKeyPointLookup != null) {
+ RegionScanner regionScanner =
multiKeyPointLookup.getNewScanner();
+ if (regionScanner == null) {
+ return false;
+ }
+ delegate.close();
+ delegate = regionScanner;
+ }
+ }
+
+ if (multiKeyPointLookup != null) {
+ return multiKeyPointLookup.next(results, raw, delegate);
+ }
+ boolean hasMore = raw ? delegate.nextRaw(results) :
delegate.next(results);
+ if (pagingFilter == null) {
+ return hasMore;
+ }
+ if (!hasMore) {
+ // There is no more row from the HBase region scanner. We need to
check if
+ // PagingFilter has stopped the region scanner
+ if (pagingFilter.isStopped()) {
+ if (results.isEmpty()) {
+ byte[] rowKey =
pagingFilter.getCurrentRowKeyToBeExcluded();
+ LOGGER.info("Page filter stopped, generating dummy key {}
",
+ Bytes.toStringBinary(rowKey));
+ ScanUtil.getDummyResult(rowKey, results);
+ }
+ return true;
}
- throw e;
+ return false;
+ } else {
+ // We got a row from the HBase scanner within the configured time
(i.e.,
+ // the page size). We need to start a new page on the next next()
call.
+ return true;
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index a9dd460599..ec61489af7 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -198,7 +198,7 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
dataScan.setTimeRange(scan.getTimeRange().getMin(),
scan.getTimeRange().getMax());
scanRanges.initializeScan(dataScan);
SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
- dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
+ dataScan.setFilter(new SkipScanFilter(skipScanFilter, false,
true));
dataScan.setAttribute(SERVER_PAGE_SIZE_MS,
Bytes.toBytes(Long.valueOf(pageSizeMs)));
return dataScan;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
index eeab9f4264..60b8b5a936 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/covered/data/CachedLocalTable.java
@@ -162,7 +162,7 @@ public class CachedLocalTable implements LocalHBaseState {
*/
long timestamp =
getMaxTimestamp(dataTableMutationsWithSameRowKeyAndTimestamp);
scan.setTimeRange(0, timestamp);
- scan.setFilter(new SkipScanFilter(skipScanFilter, true));
+ scan.setFilter(new SkipScanFilter(skipScanFilter, true, true));
} else {
assert scan.isRaw();
scan.readVersions(1);
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
index d154ecdfd5..04416fa2fe 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InListIT.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.end2end;
import static java.util.Collections.singletonList;
+import static
org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN;
import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
@@ -172,10 +173,13 @@ public class InListIT extends ParallelStatsDisabledIT {
private static String prefix = generateUniqueName();
boolean checkMaxSkipScanCardinality = true;
+ boolean bloomFilter = true;
- public InListIT(boolean param) throws Exception {
+ public InListIT(boolean param1, boolean param2) throws Exception {
// Setup max skip scan size appropriate for the tests.
- checkMaxSkipScanCardinality = param;
+ checkMaxSkipScanCardinality = param1;
+ // Run tests with and with bloom filter
+ bloomFilter = param2;
Map<String, String> DEFAULT_PROPERTIES = new HashMap<String, String>()
{{
put(QueryServices.MAX_IN_LIST_SKIP_SCAN_SIZE,
checkMaxSkipScanCardinality ? String.valueOf(15) : String.valueOf(-1));
}};
@@ -184,10 +188,13 @@ public class InListIT extends ParallelStatsDisabledIT {
}
- @Parameterized.Parameters(name="checkMaxSkipScanCardinality = {0}")
- public static synchronized Collection<Boolean[]> data() {
- return Arrays.asList(new Boolean[][] {
- { false },{ true }
+ @Parameterized.Parameters(name="checkMaxSkipScanCardinality = {0},
bloomFilter = {1}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {true, true},
+ {true, false},
+ {false, true},
+ {false, false}
});
}
@@ -272,7 +279,8 @@ public class InListIT extends ParallelStatsDisabledIT {
* @param isMultiTenant whether or not the table needs a tenant_id column
* @return the final DDL statement
*/
- private static String createTableDDL(String tableName, PDataType pkType,
int saltBuckets, boolean isMultiTenant) {
+ private static String createTableDDL(String tableName, PDataType pkType,
int saltBuckets,
+ boolean isMultiTenant, boolean isBloomFilterEnabled) {
StringBuilder ddlBuilder = new StringBuilder();
ddlBuilder.append("CREATE TABLE ").append(tableName).append(" ( ");
@@ -302,7 +310,12 @@ public class InListIT extends ParallelStatsDisabledIT {
if(isMultiTenant) {
ddlBuilder.append("MULTI_TENANT=true");
}
-
+ if (isBloomFilterEnabled) {
+ if (saltBuckets != 0 || isMultiTenant) {
+ ddlBuilder.append(", ");
+ }
+ ddlBuilder.append("BLOOMFILTER='ROW'");
+ }
return ddlBuilder.toString();
}
@@ -316,9 +329,12 @@ public class InListIT extends ParallelStatsDisabledIT {
* @param saltBuckets the number of salt buckets if the table is salted,
otherwise 0
* @return the table or view name that should be used to access the
created table
*/
- private static String initializeAndGetTable(Connection baseConn,
Connection conn, boolean isMultiTenant, PDataType pkType, int saltBuckets)
throws SQLException {
+ private static String initializeAndGetTable(Connection baseConn,
Connection conn,
+ boolean isMultiTenant, PDataType pkType, int saltBuckets, boolean
isBloomFilterEnabled)
+ throws SQLException {
String tableName = getTableName(isMultiTenant, pkType, saltBuckets);
- String tableDDL = createTableDDL(tableName, pkType, saltBuckets,
isMultiTenant);
+ String tableDDL = createTableDDL(tableName, pkType, saltBuckets,
+ isMultiTenant, isBloomFilterEnabled);
baseConn.createStatement().execute(tableDDL);
// if requested, create a tenant specific view and return the view
name instead
@@ -440,7 +456,7 @@ public class InListIT extends ParallelStatsDisabledIT {
// use a different table with a unique name for each
variation
String tableName =
initializeAndGetTable(baseConn, conn,
isMultiTenant, pkType,
- saltBuckets);
+ saltBuckets, bloomFilter);
// upsert the given data
for (String upsertBody : DEFAULT_UPSERT_BODIES) {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 59dea4e367..43a8ab1a60 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -173,7 +173,8 @@ public class TableTTLIT extends BaseTest {
int flushCounter = RAND.nextInt(maxFlushCounter) + 1;
int maskingCounter = RAND.nextInt(maxMaskingCounter) + 1;
int verificationCounter = RAND.nextInt(maxVerificationCounter) + 1;
- for (int i = 0; i < 500; i++) {
+ int maxIterationCount = multiCF ? 250 : 500;
+ for (int i = 0; i < maxIterationCount; i++) {
if (flushCounter-- == 0) {
injectEdge.incrementValue(1000);
LOG.info("Flush " + i + " current time: " +
injectEdge.currentTime());
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
index 6e4d17121c..7fa819fad6 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/compile/WhereCompilerTest.java
@@ -139,7 +139,7 @@ public class WhereCompilerTest extends
BaseConnectionlessQueryTest {
ImmutableList.of(Arrays.asList(
pointRange("i1"),
pointRange("i2"))),
- SchemaUtil.VAR_BINARY_SCHEMA),
+ SchemaUtil.VAR_BINARY_SCHEMA, false),
singleKVFilter(
or(constantComparison(CompareOperator.EQUAL,id,"i1"),
and(constantComparison(CompareOperator.EQUAL,id,"i2"),
@@ -682,7 +682,7 @@ public class WhereCompilerTest extends
BaseConnectionlessQueryTest {
pointRange(tenantId1),
pointRange(tenantId2),
pointRange(tenantId3))),
- plan.getTableRef().getTable().getRowKeySchema()),
+ plan.getTableRef().getTable().getRowKeySchema(), false),
filter);
}
@@ -705,7 +705,7 @@ public class WhereCompilerTest extends
BaseConnectionlessQueryTest {
pointRange(tenantId1),
pointRange(tenantId2),
pointRange(tenantId3))),
- plan.getTableRef().getTable().getRowKeySchema()),
+ plan.getTableRef().getTable().getRowKeySchema(), false),
filter);
byte[] startRow = PVarchar.INSTANCE.toBytes(tenantId1);
@@ -738,7 +738,7 @@ public class WhereCompilerTest extends
BaseConnectionlessQueryTest {
Arrays.asList(
pointRange(tenantId,entityId1),
pointRange(tenantId,entityId2))),
- SchemaUtil.VAR_BINARY_SCHEMA),
+ SchemaUtil.VAR_BINARY_SCHEMA, false),
filter);
}
@@ -768,7 +768,7 @@ public class WhereCompilerTest extends
BaseConnectionlessQueryTest {
true,
Bytes.toBytes(entityId2),
true, SortOrder.ASC))),
- plan.getTableRef().getTable().getRowKeySchema()),
+ plan.getTableRef().getTable().getRowKeySchema(), false),
filter);
}
@@ -792,7 +792,7 @@ public class WhereCompilerTest extends
BaseConnectionlessQueryTest {
pointRange(tenantId1, entityId),
pointRange(tenantId2, entityId),
pointRange(tenantId3, entityId))),
- SchemaUtil.VAR_BINARY_SCHEMA),
+ SchemaUtil.VAR_BINARY_SCHEMA, false),
filter);
}
@Test
@@ -846,7 +846,7 @@ public class WhereCompilerTest extends
BaseConnectionlessQueryTest {
pointRange(tenantId1, entityId2),
pointRange(tenantId2, entityId1),
pointRange(tenantId2, entityId2))),
- SchemaUtil.VAR_BINARY_SCHEMA),
+ SchemaUtil.VAR_BINARY_SCHEMA, false),
filter);
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterIntersectTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterIntersectTest.java
index d3597b78ed..1e5314d264 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterIntersectTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterIntersectTest.java
@@ -55,7 +55,7 @@ public class SkipScanFilterIntersectTest {
public SkipScanFilterIntersectTest(List<List<KeyRange>> slots,
RowKeySchema schema, byte[] lowerInclusiveKey,
byte[] upperExclusiveKey, List<List<KeyRange>> expectedNewSlots) {
- this.filter = new SkipScanFilter(slots, schema);
+ this.filter = new SkipScanFilter(slots, schema, false);
this.lowerInclusiveKey = lowerInclusiveKey;
this.upperExclusiveKey = upperExclusiveKey;
this.expectedNewSlots = expectedNewSlots;
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
index ceeaa6bcdd..1e02c9de9d 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/filter/SkipScanFilterTest.java
@@ -95,9 +95,9 @@ public class SkipScanFilterTest extends TestCase {
}, width <= 0, SortOrder.getDefault());
}
if(slotSpans==null) {
- skipper = new SkipScanFilter(cnf, builder.build());
+ skipper = new SkipScanFilter(cnf, builder.build(), false);
} else {
- skipper = new SkipScanFilter(cnf, slotSpans,builder.build());
+ skipper = new SkipScanFilter(cnf, slotSpans,builder.build(),
false);
}
}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 8433906f37..b6e367a77b 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -291,7 +291,7 @@ public class ParallelIteratorsSplitTest extends
BaseConnectionlessQueryTest {
}
private static Collection<?> foreach(ScanRanges scanRanges, int[] widths,
KeyRange[] expectedSplits) {
- SkipScanFilter filter = new SkipScanFilter(scanRanges.getRanges(),
buildSchema(widths));
+ SkipScanFilter filter = new SkipScanFilter(scanRanges.getRanges(),
buildSchema(widths), false);
Scan scan = new
Scan().setFilter(filter).withStartRow(KeyRange.UNBOUND).withStopRow(KeyRange.UNBOUND,
true);
List<Object> ret = Lists.newArrayList();
ret.add(new Object[] {scan, scanRanges,
Arrays.<KeyRange>asList(expectedSplits)});
@@ -301,7 +301,7 @@ public class ParallelIteratorsSplitTest extends
BaseConnectionlessQueryTest {
private static Collection<?> foreach(KeyRange[][] ranges, int[] widths,
KeyRange[] expectedSplits) {
RowKeySchema schema = buildSchema(widths);
List<List<KeyRange>> slots =
Lists.transform(Lists.newArrayList(ranges), ARRAY_TO_LIST);
- SkipScanFilter filter = new SkipScanFilter(slots, schema);
+ SkipScanFilter filter = new SkipScanFilter(slots, schema, false);
// Always set start and stop key to max to verify we are using the
information in skipscan
// filter over the scan's KMIN and KMAX.
Scan scan = new Scan().setFilter(filter);