This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new e1caf95670 PHOENIX-7707 Phoenix server paging on valid rows (#2294)
e1caf95670 is described below
commit e1caf956708f06e71440605b603eed20a094cf55
Author: tkhurana <[email protected]>
AuthorDate: Wed Oct 8 17:51:12 2025 -0700
PHOENIX-7707 Phoenix server paging on valid rows (#2294)
---
.../hbase/regionserver/PhoenixScannerContext.java | 139 ++++++++++++
.../hbase/regionserver/ScannerContextUtil.java | 67 ------
.../coprocessor/BaseScannerRegionObserver.java | 34 ++-
.../phoenix/coprocessor/DelegateRegionScanner.java | 15 +-
.../GroupedAggregateRegionObserver.java | 26 ++-
.../phoenix/coprocessor/HashJoinRegionScanner.java | 8 +-
.../phoenix/coprocessor/PagingRegionScanner.java | 31 ++-
.../phoenix/coprocessor/TTLRegionScanner.java | 25 ++-
.../coprocessor/UncoveredIndexRegionScanner.java | 28 ++-
.../UncoveredLocalIndexRegionScanner.java | 6 +-
.../UngroupedAggregateRegionObserver.java | 11 -
.../UngroupedAggregateRegionScanner.java | 34 ++-
.../apache/phoenix/index/GlobalIndexChecker.java | 7 +-
.../iterate/NonAggregateRegionScannerFactory.java | 65 ++++--
.../phoenix/iterate/RegionScannerFactory.java | 6 -
.../iterate/RegionScannerResultIterator.java | 16 +-
.../org/apache/phoenix/end2end/ServerPagingIT.java | 239 ++++++++++++++++++++-
.../end2end/index/GlobalIndexCheckerIT.java | 2 +-
.../phoenix/monitoring/CountRowsScannedIT.java | 32 +++
19 files changed, 587 insertions(+), 204 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java
new file mode 100644
index 0000000000..49d10685fc
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ScannerContext has all methods package visible. To properly update the
context progress for our
+ * scanners we need this helper
+ */
+public class PhoenixScannerContext extends ScannerContext {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixScannerContext.class);
+
+ // tracks the start time of the rpc on the server for server paging
+ private final long startTime;
+
+ /**
+ * The scanner remains open on the server during the course of multiple scan
rpc requests. We need
+ * a way to determine during the next() call if it is a new scan rpc request
on the same scanner.
+ * This is needed so that we can reset the start time for server paging.
Every scan rpc request
+ * creates a new ScannerContext which has the lastPeekedCell set to null in
the beginning.
+ * Subsequent next() calls will set this field in the ScannerContext.
+ */
+ public static boolean isNewScanRpcRequest(ScannerContext scannerContext) {
+ return scannerContext != null && scannerContext.getLastPeekedCell() ==
null;
+ }
+
+ public PhoenixScannerContext(ScannerContext hbaseContext) {
+ // set limits to null to create no limit context
+ super(Objects.requireNonNull(hbaseContext).keepProgress, null,
+ Objects.requireNonNull(hbaseContext).isTrackingMetrics());
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
+ }
+
+ public PhoenixScannerContext(boolean trackMetrics) {
+ super(false, null, trackMetrics);
+ startTime = EnvironmentEdgeManager.currentTimeMillis();
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void incrementSizeProgress(List<Cell> cells) {
+ for (Cell cell : cells) {
+
super.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
cell.heapSize());
+ }
+ }
+
+ /**
+ * returnImmediately is a private field in ScannerContext and there is no
getter API on it But the
+ * checkTimeLimit API on the ScannerContext will return true if
returnImmediately is set
+ */
+ public boolean isReturnImmediately() {
+ return checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
+ }
+
+ /**
+ * Update the scanner context created by RSRpcServices so that it can act
accordingly
+ * @param dst hbase scanner context created on every new scan rpc request
+ * @param result list of cells to be returned to the client as scan rpc
response
+ */
+ public void updateHBaseScannerContext(ScannerContext dst, List<Cell> result)
{
+ if (dst == null) {
+ return;
+ }
+ // update last peeked cell
+ dst.setLastPeekedCell(getLastPeekedCell());
+ // update return immediately
+ if (isDummy(result) || isReturnImmediately()) {
+ // when a dummy row is returned by a lower layer, set returnImmediately
+ // on the ScannerContext to force HBase to return a response to the
client
+ dst.returnImmediately();
+ }
+ // update metrics
+ if (isTrackingMetrics() && dst.isTrackingMetrics()) {
+ // getMetricsMap call resets the metrics internally
+ for (Map.Entry<String, Long> entry :
getMetrics().getMetricsMap().entrySet()) {
+ dst.metrics.addToCounter(entry.getKey(), entry.getValue());
+ }
+ }
+ // update progress
+ dst.setProgress(getBatchProgress(), getDataSizeProgress(),
getHeapSizeProgress());
+ }
+
+ public static boolean isTimedOut(ScannerContext context, long pageSizeMs) {
+ if (context == null || !(context instanceof PhoenixScannerContext)) {
+ return false;
+ }
+ PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext)
context;
+ return EnvironmentEdgeManager.currentTimeMillis() -
phoenixScannerContext.startTime
+ > pageSizeMs;
+ }
+
+ /**
+ * Set returnImmediately on the ScannerContext to true, it will have the
same behavior as reaching
+ * the time limit. Use this to make RSRpcService.scan return immediately.
+ */
+ public static void setReturnImmediately(ScannerContext context) {
+ if (context == null || !(context instanceof PhoenixScannerContext)) {
+ return;
+ }
+ PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext)
context;
+ phoenixScannerContext.returnImmediately();
+ }
+
+ public static boolean isReturnImmediately(ScannerContext context) {
+ if (context == null || !(context instanceof PhoenixScannerContext)) {
+ return false;
+ }
+ PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext)
context;
+ return phoenixScannerContext.isReturnImmediately();
+ }
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
deleted file mode 100644
index 23bf60cde0..0000000000
---
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.PrivateCellUtil;
-
-/**
- * ScannerContext has all methods package visible. To properly update the
context progress for our
- * scanners we need this helper
- */
-public class ScannerContextUtil {
- public static void incrementSizeProgress(ScannerContext sc, List<Cell>
cells) {
- for (Cell cell : cells) {
-
sc.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
cell.heapSize());
- }
- }
-
- public static void updateMetrics(ScannerContext src, ScannerContext dst) {
- if (src != null && dst != null && src.isTrackingMetrics() &&
dst.isTrackingMetrics()) {
- for (Map.Entry<String, Long> entry :
src.getMetrics().getMetricsMap().entrySet()) {
- dst.metrics.addToCounter(entry.getKey(), entry.getValue());
- }
- }
- }
-
- public static ScannerContext copyNoLimitScanner(ScannerContext sc) {
- return new ScannerContext(sc.keepProgress, null, sc.isTrackingMetrics());
- }
-
- public static void updateTimeProgress(ScannerContext sc) {
- sc.updateTimeProgress();
- }
-
- /**
- * Set returnImmediately on the ScannerContext to true, it will have the
same behavior as reaching
- * the time limit. Use this to make RSRpcService.scan return immediately.
- */
- public static void setReturnImmediately(ScannerContext sc) {
- sc.returnImmediately();
- }
-
- /**
- * returnImmediately is a private field in ScannerContext and there is no
getter API on it But the
- * checkTimeLimit API on the ScannerContext will return true if
returnImmediately is set
- */
- public static boolean checkTimeLimit(ScannerContext sc) {
- return sc.checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
- }
-}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b0a134151f..723eaee039 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -37,12 +37,12 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.regionserver.Store;
import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -191,6 +191,8 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
private final Scan scan;
private final ObserverContext<RegionCoprocessorEnvironment> c;
private boolean wasOverriden;
+ // tracks the current phoenix scanner context corresponding to the hbase
scanner context
+ private PhoenixScannerContext phoenixScannerContext;
public RegionScannerHolder(ObserverContext<RegionCoprocessorEnvironment>
c, Scan scan,
final RegionScanner scanner) {
@@ -248,10 +250,7 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
- overrideDelegate();
- boolean res = super.next(result, scannerContext);
- ScannerContextUtil.incrementSizeProgress(scannerContext, result);
- return res;
+ return nextInternal(result, scannerContext, false);
}
@Override
@@ -262,9 +261,30 @@ abstract public class BaseScannerRegionObserver implements
RegionObserver {
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
throws IOException {
+ return nextInternal(result, scannerContext, true);
+ }
+
+ private boolean nextInternal(List<Cell> result, ScannerContext
scannerContext, boolean isRaw)
+ throws IOException {
overrideDelegate();
- boolean res = super.nextRaw(result, scannerContext);
- ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+ if (scannerContext instanceof PhoenixScannerContext) {
+ // This is an optimization to avoid creating multiple phoenix scanner
context objects for
+ // the same scan rpc request when multiple RegionScannerHolder objects
are stacked which
+ // happens if multiple coprocs (not scanners) are processing the scan
like
+ // UngroupedAggregateRegionObserver and GlobalIndexChecker
+ phoenixScannerContext = (PhoenixScannerContext) scannerContext;
+ } else if (PhoenixScannerContext.isNewScanRpcRequest(scannerContext)) {
+ // An open scanner can process multiple scan rpcs during its lifetime.
+ // We need to create a new phoenix scanner context for every new scan
rpc request.
+ phoenixScannerContext = new PhoenixScannerContext(scannerContext);
+ }
+ boolean res = isRaw
+ ? super.nextRaw(result, phoenixScannerContext)
+ : super.next(result, phoenixScannerContext);
+ if (!(scannerContext instanceof PhoenixScannerContext)) {
+ // only update the top level hbase scanner context
+ phoenixScannerContext.updateHBaseScannerContext(scannerContext,
result);
+ }
return res;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index ad64e3ae58..4563f102fd 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -17,8 +17,6 @@
*/
package org.apache.phoenix.coprocessor;
-import static org.apache.phoenix.util.ScanUtil.isDummy;
-
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
@@ -27,7 +25,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
public class DelegateRegionScanner implements RegionScanner {
@@ -103,17 +100,7 @@ public class DelegateRegionScanner implements
RegionScanner {
private boolean next(List<Cell> result, boolean raw, ScannerContext
scannerContext)
throws IOException {
if (scannerContext != null) {
- ScannerContext noLimitContext =
ScannerContextUtil.copyNoLimitScanner(scannerContext);
- boolean hasMore =
- raw ? delegate.nextRaw(result, noLimitContext) : delegate.next(result,
noLimitContext);
- if (isDummy(result) ||
ScannerContextUtil.checkTimeLimit(noLimitContext)) {
- // when a dummy row is returned by a lower layer or if the result is
valid but the lower
- // layer signals us to return immediately, we need to set
returnImmediately
- // on the ScannerContext to force HBase to return a response to the
client
- ScannerContextUtil.setReturnImmediately(scannerContext);
- }
- ScannerContextUtil.updateMetrics(noLimitContext, scannerContext);
- return hasMore;
+ return raw ? delegate.nextRaw(result, scannerContext) :
delegate.next(result, scannerContext);
}
return raw ? delegate.nextRaw(result) : delegate.next(result);
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 39d9527a83..b7163c0a3f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -81,7 +82,6 @@ import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
@@ -605,8 +605,6 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver
private boolean nextInternal(List<Cell> resultsToReturn, ScannerContext
scannerContext)
throws IOException {
boolean hasMore;
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
- long now;
Tuple result =
useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new
MultiKeyValueTuple();
boolean acquiredLock = false;
@@ -643,8 +641,11 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver
// Aggregate values here
aggregators.aggregate(rowAggregators, result);
}
- now = EnvironmentEdgeManager.currentTimeMillis();
- if (hasMore && groupByCache.size() < limit && (now - startTime) >=
pageSizeMs) {
+ if (
+ hasMore && groupByCache.size() < limit
+ && (PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext,
pageSizeMs))
+ ) {
return getDummyResult(resultsToReturn);
}
} while (hasMore && groupByCache.size() < limit);
@@ -784,8 +785,7 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver
boolean hasMore;
boolean atLimit;
boolean aggBoundary = false;
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
- long now;
+ boolean pageTimeout = false;
Tuple result =
useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new
MultiKeyValueTuple();
ImmutableBytesPtr key = null;
@@ -835,8 +835,14 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver
atLimit = rowCount + countOffset >= limit;
// Do rowCount + 1 b/c we don't have to wait for a complete
// row in the case of a DISTINCT with a LIMIT
- now = EnvironmentEdgeManager.currentTimeMillis();
- } while (hasMore && !aggBoundary && !atLimit && (now - startTime) <
pageSizeMs);
+ if (
+ PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ ) {
+ pageTimeout = true;
+ break;
+ }
+ } while (hasMore && !aggBoundary && !atLimit && !pageTimeout);
}
} catch (Exception e) {
LOGGER.error("Ordered group-by scanner next encountered error for
region {}",
@@ -850,7 +856,7 @@ public class GroupedAggregateRegionObserver extends
BaseScannerRegionObserver
if (acquiredLock) region.closeRegionOperation();
}
try {
- if (hasMore && !aggBoundary && !atLimit && (now - startTime) >=
pageSizeMs) {
+ if (hasMore && !aggBoundary && !atLimit && pageTimeout) {
updateDummyWithPrevRowKey(results, initStartRowKey,
includeInitStartRowKey, scan);
return true;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index e56d9028f4..ab62a7ddf7 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
@@ -59,7 +60,6 @@ import org.apache.phoenix.schema.tuple.ResultTuple;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.util.ClientUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.TupleUtil;
public class HashJoinRegionScanner implements RegionScanner {
@@ -306,7 +306,6 @@ public class HashJoinRegionScanner implements RegionScanner
{
private boolean next(List<Cell> result, boolean raw, ScannerContext
scannerContext)
throws IOException {
try {
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (shouldAdvance()) {
if (scannerContext != null) {
hasMore =
@@ -322,7 +321,10 @@ public class HashJoinRegionScanner implements
RegionScanner {
}
Cell cell = result.get(0);
processResults(result, false);
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime >=
pageSizeMs) {
+ if (
+ PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ ) {
byte[] rowKey = CellUtil.cloneRow(cell);
result.clear();
getDummyResult(rowKey, result);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
index e3f1d6afea..7cde2a393f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
@@ -26,6 +26,7 @@ import
org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -34,7 +35,6 @@ import org.apache.phoenix.filter.PagingFilter;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,17 +60,16 @@ public class PagingRegionScanner extends BaseRegionScanner {
private PagingFilter pagingFilter;
private MultiKeyPointLookup multiKeyPointLookup = null;
private boolean initialized = false;
+ private long pageSizeMs;
private class MultiKeyPointLookup {
private SkipScanFilter skipScanFilter;
private List<KeyRange> pointLookupRanges = null;
private int lookupPosition = 0;
private byte[] lookupKeyPrefix = null;
- private long pageSizeMs;
private MultiKeyPointLookup(SkipScanFilter skipScanFilter) throws
IOException {
this.skipScanFilter = skipScanFilter;
- pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
pointLookupRanges = skipScanFilter.getPointLookupKeyRanges();
lookupPosition = findLookupPosition(scan.getStartRow());
if (skipScanFilter.getOffset() > 0) {
@@ -133,7 +132,6 @@ public class PagingRegionScanner extends BaseRegionScanner {
private boolean next(List<Cell> results, boolean raw, RegionScanner
scanner,
ScannerContext scannerContext) throws IOException {
try {
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
while (true) {
boolean hasMore;
if (scannerContext != null) {
@@ -152,15 +150,19 @@ public class PagingRegionScanner extends
BaseRegionScanner {
"Each scan is supposed to return only one row, scan " + scan +
", region " + region);
}
if (!results.isEmpty()) {
+ if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
+ // we got a valid result but scanner timed out so return
immediately
+ PhoenixScannerContext.setReturnImmediately(scannerContext);
+ }
return hasMore();
}
// The scanner returned an empty result. This means that one of the
rows
- // has been deleted.
+ // has been deleted or the row key is not present in the table.
if (!hasMore()) {
return false;
}
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
pageSizeMs) {
+ if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
byte[] rowKey = pointLookupRanges.get(lookupPosition -
1).getLowerRange();
ScanUtil.getDummyResult(rowKey, results);
return true;
@@ -187,6 +189,7 @@ public class PagingRegionScanner extends BaseRegionScanner {
this.region = region;
this.scan = scan;
pagingFilter = ScanUtil.getPhoenixPagingFilter(scan);
+ pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
}
@VisibleForTesting
@@ -258,10 +261,11 @@ public class PagingRegionScanner extends
BaseRegionScanner {
}
}
+ boolean hasMore;
if (multiKeyPointLookup != null) {
return multiKeyPointLookup.next(results, raw, delegate, scannerContext);
}
- boolean hasMore;
+
if (scannerContext != null) {
hasMore =
raw ? delegate.nextRaw(results, scannerContext) :
delegate.next(results, scannerContext);
@@ -277,16 +281,23 @@ public class PagingRegionScanner extends
BaseRegionScanner {
if (pagingFilter.isStopped()) {
if (results.isEmpty()) {
byte[] rowKey = pagingFilter.getCurrentRowKeyToBeExcluded();
- LOGGER.info("Page filter stopped, generating dummy key {} ",
- Bytes.toStringBinary(rowKey));
+ LOGGER.info("{} Paging filter stopped, generating dummy key {} ",
+ getRegionInfo().getRegionNameAsString(),
Bytes.toStringBinary(rowKey));
ScanUtil.getDummyResult(rowKey, results);
+ } else {
+ // we got a valid result but page filter stopped set return
immediately
+ PhoenixScannerContext.setReturnImmediately(scannerContext);
}
return true;
}
return false;
} else {
// We got a row from the HBase scanner within the configured time (i.e.,
- // the page size). We need to start a new page on the next next() call.
+ // the page size).
+ if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
+ // we got a valid result but scanner timed out so return immediately
+ PhoenixScannerContext.setReturnImmediately(scannerContext);
+ }
return true;
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 8a4fc50299..6ed234db6d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.phoenix.schema.CompiledTTLExpression;
@@ -212,7 +213,8 @@ public class TTLRegionScanner extends BaseRegionScanner {
return false;
}
- private boolean skipExpired(List<Cell> result, boolean raw, boolean hasMore)
throws IOException {
+ private boolean skipExpired(List<Cell> result, boolean raw, boolean hasMore,
+ ScannerContext scannerContext) throws IOException {
boolean expired = isExpired(result);
if (!expired) {
return hasMore;
@@ -221,23 +223,28 @@ public class TTLRegionScanner extends BaseRegionScanner {
if (!hasMore) {
return false;
}
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
do {
- hasMore = raw ? delegate.nextRaw(result) : delegate.next(result);
+ hasMore =
+ raw ? delegate.nextRaw(result, scannerContext) : delegate.next(result,
scannerContext);
if (result.isEmpty() || ScanUtil.isDummy(result)) {
- return hasMore;
+ break;
}
+ // non dummy result check if it is expired
if (!isExpired(result)) {
- return hasMore;
+ break;
}
+ // result is expired
Cell cell = result.get(0);
result.clear();
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime > pageSizeMs)
{
+ if (
+ PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ ) {
ScanUtil.getDummyResult(CellUtil.cloneRow(cell), result);
- return hasMore;
+ break;
}
} while (hasMore);
- return false;
+ return hasMore;
}
private boolean next(List<Cell> result, boolean raw, ScannerContext
scannerContext)
@@ -267,7 +274,7 @@ public class TTLRegionScanner extends BaseRegionScanner {
if (result.isEmpty() || ScanUtil.isDummy(result)) {
return hasMore;
}
- hasMore = skipExpired(result, raw, hasMore);
+ hasMore = skipExpired(result, raw, hasMore, scannerContext);
if (result.isEmpty() || ScanUtil.isDummy(result)) {
return hasMore;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 6e1b853bac..1b126a6591 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -210,9 +211,8 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
}
}
- protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
- final byte[] actualStartKey, final int offset, ScannerContext
scannerContext)
- throws IOException {
+ protected boolean scanIndexTableRows(List<Cell> result, final byte[]
actualStartKey,
+ final int offset, ScannerContext scannerContext) throws IOException {
boolean hasMore = false;
if (actualStartKey != null) {
do {
@@ -231,7 +231,10 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
firstCell.getRowLength(), actualStartKey, 0,
actualStartKey.length) < 0
) {
result.clear();
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime >=
pageSizeMs) {
+ if (
+ PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ ) {
byte[] rowKey = CellUtil.cloneRow(firstCell);
ScanUtil.getDummyResult(rowKey, result);
return true;
@@ -270,7 +273,10 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
viewConstants));
indexRows.add(row);
indexRowCount++;
- if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() -
startTime) >= pageSizeMs) {
+ if (
+ hasMore && (PhoenixScannerContext.isTimedOut(scannerContext,
pageSizeMs)
+ || PhoenixScannerContext.isReturnImmediately(scannerContext))
+ ) {
getDummyResult(lastIndexRowKey, result);
// We do not need to change the state, State.SCANNING_INDEX
// since we will continue scanning the index table after
@@ -283,9 +289,9 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
return hasMore;
}
- protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
- ScannerContext scannerContext) throws IOException {
- return scanIndexTableRows(result, startTime, null, 0, scannerContext);
+ protected boolean scanIndexTableRows(List<Cell> result, ScannerContext
scannerContext)
+ throws IOException {
+ return scanIndexTableRows(result, null, 0, scannerContext);
}
private boolean verifyIndexRowAndRepairIfNecessary(Result dataRow, byte[]
indexRowKey,
@@ -393,7 +399,9 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
*/
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws
IOException {
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ long startTime = (scannerContext != null)
+ ? ((PhoenixScannerContext) scannerContext).getStartTime()
+ : EnvironmentEdgeManager.currentTimeMillis();
boolean hasMore;
region.startRegionOperation();
try {
@@ -409,7 +417,7 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
state = State.SCANNING_INDEX;
}
if (state == State.SCANNING_INDEX) {
- hasMore = scanIndexTableRows(result, startTime, scannerContext);
+ hasMore = scanIndexTableRows(result, scannerContext);
if (isDummy(result)) {
updateDummyWithPrevRowKey(result, initStartRowKey,
includeInitStartRowKey, scan);
return hasMore;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index 34d9fb67b9..6b5d124ce0 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -105,9 +105,9 @@ public class UncoveredLocalIndexRegionScanner extends
UncoveredIndexRegionScanne
}
@Override
- protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
- ScannerContext scannerContext) throws IOException {
- return scanIndexTableRows(result, startTime, actualStartKey, offset,
scannerContext);
+ protected boolean scanIndexTableRows(List<Cell> result, ScannerContext
scannerContext)
+ throws IOException {
+ return scanIndexTableRows(result, actualStartKey, offset, scannerContext);
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 7fdfdeeb60..d85f800540 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.regionserver.Store;
import
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -766,13 +765,6 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
private RegionScanner collectStats(final RegionScanner innerScanner,
StatisticsCollector stats,
final Region region, final Scan scan, Configuration config) throws
IOException {
- ScannerContext groupScannerContext;
- if (scan.isScanMetricsEnabled()) {
- groupScannerContext =
-
ScannerContext.newBuilder().setTrackMetrics(scan.isScanMetricsEnabled()).build();
- } else {
- groupScannerContext = null;
- }
StatsCollectionCallable callable =
new StatsCollectionCallable(stats, region, innerScanner, config, scan);
byte[] asyncBytes =
@@ -826,9 +818,6 @@ public class UngroupedAggregateRegionObserver extends
BaseScannerRegionObserver
@Override
public boolean next(List<Cell> results, ScannerContext scannerContext)
throws IOException {
- if (groupScannerContext != null && scannerContext != null) {
- ScannerContextUtil.updateMetrics(groupScannerContext,
scannerContext);
- }
return next(results);
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 20ace9a02e..1a4794e86e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -61,10 +61,10 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.GlobalCache;
@@ -110,7 +110,6 @@ import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.ClientUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ExpressionUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.LogUtil;
@@ -604,8 +603,6 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
public boolean next(List<Cell> resultsToReturn, ScannerContext
scannerContext)
throws IOException {
boolean hasMore;
- boolean returnImmediately = false;
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
Configuration conf = env.getConfiguration();
final TenantCache tenantCache = GlobalCache.getTenantCache(env,
ScanUtil.getTenantId(scan));
try (MemoryManager.MemoryChunk em =
tenantCache.getMemoryManager().allocate(0)) {
@@ -648,10 +645,10 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
resultsToReturn.addAll(results);
return true;
}
- // we got a dummy result from the lower scanner but hasAny is
true which means that
- // we have a valid result which can be returned to the client
instead of a dummy.
- // We need to signal the RPC handler to return.
- returnImmediately = true;
+ // we got a page timeout from the lower scanner but hasAny is
true which means that
+ // we have a valid result which we can return to the client
instead of a dummy but we
+ // still need to finish the rpc and release the handler
+ PhoenixScannerContext.setReturnImmediately(scannerContext);
break;
}
if (!results.isEmpty()) {
@@ -705,13 +702,16 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
aggregators.aggregate(rowAggregators, result);
hasAny = true;
}
- } while (
- hasMore && (EnvironmentEdgeManager.currentTimeMillis() -
startTime) < pageSizeMs
- );
- if (EnvironmentEdgeManager.currentTimeMillis() - startTime >=
pageSizeMs) {
- // we hit a page scanner timeout, signal the RPC handler to return.
- returnImmediately = true;
- }
+ if (
+ PhoenixScannerContext.isReturnImmediately(scannerContext)
+ || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+ ) {
+ // we could have a valid result which we can return to the
client instead of a dummy,
+ // but we still need to finish the rpc and release the handler
+ PhoenixScannerContext.setReturnImmediately(scannerContext);
+ break;
+ }
+ } while (hasMore);
if (!mutations.isEmpty()) {
if (!isSingleRowDelete) {
annotateAndCommit(mutations);
@@ -763,10 +763,6 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
}
resultsToReturn.add(keyValue);
}
- if (returnImmediately && scannerContext != null) {
- // signal the RPC handler to return
- ScannerContextUtil.setReturnImmediately(scannerContext);
- }
return hasMore;
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 8e82f17036..97f23c498f 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -256,7 +257,6 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
init();
initialized = true;
}
- long startTime = EnvironmentEdgeManager.currentTimeMillis();
do {
if (raw) {
hasMore = (scannerContext == null)
@@ -277,7 +277,10 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
if (verifyRowAndRepairIfNecessary(result)) {
break;
}
- if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() -
startTime) >= pageSizeMs) {
+ if (
+ hasMore && (PhoenixScannerContext.isTimedOut(scannerContext,
pageSizeMs)
+ || PhoenixScannerContext.isReturnImmediately(scannerContext))
+ ) {
byte[] rowKey = CellUtil.cloneRow(cell);
result.clear();
getDummyResult(rowKey, result);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index cf7832b794..65affd6e79 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -38,10 +38,10 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.cache.GlobalCache;
@@ -182,7 +182,11 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan);
RegionScannerResultIterator iterator = new
RegionScannerResultIterator(scan, innerScanner,
getMinMaxQualifiersFromScan(scan), encodingScheme);
- ScannerContext sc = iterator.getRegionScannerContext();
+ // we need to create our own scanner context because we are still
opening the scanner and
+ // and don't have a rpc scanner context which is created in the next()
call. This scanner
+ // context is used when we are skipping the rows until we hit the offset
+ PhoenixScannerContext sc = new
PhoenixScannerContext(scan.isScanMetricsEnabled());
+ iterator.setRegionScannerContext(sc);
innerScanner = getOffsetScanner(innerScanner,
new OffsetResultIterator(iterator, scanOffset,
getPageSizeMsForRegionScanner(scan),
isIncompatibleClient),
@@ -280,10 +284,15 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
EncodedColumnsUtil.getQualifierEncodingScheme(scan);
RegionScannerResultIterator inner = new
RegionScannerResultIterator(scan, s,
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
+ // we need to create our own scanner context because we are still
opening the scanner and
+ // and don't have a rpc scanner context which is created in the next()
call. This scanner
+ // context is used when we are iterating over the top n rows before the
first next() call
+ PhoenixScannerContext sc = new
PhoenixScannerContext(scan.isScanMetricsEnabled());
+ inner.setRegionScannerContext(sc);
OrderedResultIterator iterator = new OrderedResultIterator(inner,
orderByExpressions,
spoolingEnabled, thresholdBytes, limit >= 0 ? limit : null, null,
estimatedRowSize,
getPageSizeMsForRegionScanner(scan), scan, s.getRegionInfo());
- return new
OrderedResultIteratorWithScannerContext(inner.getRegionScannerContext(),
iterator);
+ return new OrderedResultIteratorWithScannerContext(sc, iterator);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
@@ -296,15 +305,15 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
}
private static class OrderedResultIteratorWithScannerContext {
- private ScannerContext scannerContext;
+ private PhoenixScannerContext scannerContext;
private OrderedResultIterator iterator;
- OrderedResultIteratorWithScannerContext(ScannerContext sc,
OrderedResultIterator ori) {
+ OrderedResultIteratorWithScannerContext(PhoenixScannerContext sc,
OrderedResultIterator ori) {
this.scannerContext = sc;
this.iterator = ori;
}
- public ScannerContext getScannerContext() {
+ public PhoenixScannerContext getScannerContext() {
return scannerContext;
}
@@ -363,7 +372,7 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
private RegionScanner getOffsetScanner(final RegionScanner s, final
OffsetResultIterator iterator,
final boolean isLastScan, final boolean incompatibleClient, final Scan
scan,
- final ScannerContext sc) throws IOException {
+ final PhoenixScannerContext sc) throws IOException {
final Tuple firstTuple;
final Region region = getRegion();
region.startRegionOperation();
@@ -436,7 +445,9 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
return new BaseRegionScanner(s) {
private Tuple tuple = firstTuple;
private byte[] previousResultRowKey;
- private ScannerContext regionScannerContext = sc;
+ // scanner context used when we are opening the scanner and skipping up
to offset rows
+ // We copy this context to the hbase rpc context on the first next call
+ private PhoenixScannerContext regionScannerContext = sc;
@Override
public boolean isFilterDone() {
@@ -454,6 +465,18 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
if (isFilterDone()) {
return false;
}
+ if (regionScannerContext != null) {
+ regionScannerContext.updateHBaseScannerContext(scannerContext,
results);
+ // we no longer need this context
+ regionScannerContext = null;
+ if (PhoenixScannerContext.isReturnImmediately(scannerContext)) {
+ return true;
+ }
+ }
+ RegionScannerResultIterator delegate =
+ (RegionScannerResultIterator) (iterator.getDelegate());
+ // just use the scanner context passed to us from now on
+ delegate.setRegionScannerContext(scannerContext);
Tuple nextTuple = iterator.next();
if (tuple.size() > 0 && !isDummy(tuple)) {
for (int i = 0; i < tuple.size(); i++) {
@@ -478,10 +501,6 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
}
}
tuple = nextTuple;
- if (regionScannerContext != null) {
- ScannerContextUtil.updateMetrics(regionScannerContext,
scannerContext);
- regionScannerContext = null;
- }
return !isFilterDone();
} catch (Throwable t) {
LOGGER.error("Error while iterating Offset scanner.", t);
@@ -541,7 +560,7 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
* region) since after this everything is held in memory
*/
private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final
RegionScanner s,
- final OrderedResultIterator iterator, ImmutableBytesPtr tenantId,
ScannerContext sc)
+ final OrderedResultIterator iterator, ImmutableBytesPtr tenantId,
PhoenixScannerContext sc)
throws Throwable {
final Tuple firstTuple;
@@ -565,7 +584,9 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
}
return new BaseRegionScanner(s) {
private Tuple tuple = firstTuple;
- private ScannerContext regionScannerContext = sc;
+ // scanner context used when we are opening the scanner and reading the
topN rows
+ // We copy this context to the hbase rpc context on the first next call
+ private PhoenixScannerContext regionScannerContext = sc;
@Override
public boolean isFilterDone() {
@@ -583,6 +604,14 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
if (isFilterDone()) {
return false;
}
+ if (regionScannerContext != null) {
+ regionScannerContext.updateHBaseScannerContext(scannerContext,
results);
+ // we no longer need this context
+ regionScannerContext = null;
+ if (PhoenixScannerContext.isReturnImmediately(scannerContext)) {
+ return true;
+ }
+ }
if (isDummy(tuple)) {
ScanUtil.getDummyResult(CellUtil.cloneRow(tuple.getValue(0)),
results);
} else {
@@ -590,11 +619,11 @@ public class NonAggregateRegionScannerFactory extends
RegionScannerFactory {
results.add(tuple.getValue(i));
}
}
+ RegionScannerResultIterator delegate =
+ (RegionScannerResultIterator) (iterator.getDelegate());
+ // just use the scanner context passed to us from now on
+ delegate.setRegionScannerContext(scannerContext);
tuple = iterator.next();
- if (regionScannerContext != null) {
- ScannerContextUtil.updateMetrics(regionScannerContext,
scannerContext);
- regionScannerContext = null;
- }
return !isFilterDone();
} catch (Throwable t) {
ClientUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 9645ed413b..bda112e709 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
@@ -308,11 +307,6 @@ public abstract class RegionScannerFactory {
if (extraLimit >= 0 && --extraLimit == 0) {
return false;
}
- // There is a scanattribute set to retrieve the specific array
element
- if (scannerContext != null) {
- ScannerContextUtil.incrementSizeProgress(scannerContext, result);
- ScannerContextUtil.updateTimeProgress(scannerContext);
- }
return next;
} catch (Throwable t) {
ClientUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(),
t);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index c0cc6ec84e..adbca1f859 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -43,7 +43,7 @@ public class RegionScannerResultIterator extends
BaseResultIterator {
private final Pair<Integer, Integer> minMaxQualifiers;
private final boolean useQualifierAsIndex;
private final QualifierEncodingScheme encodingScheme;
- private final ScannerContext regionScannerContext;
+ private ScannerContext scannerContext;
public RegionScannerResultIterator(Scan scan, RegionScanner scanner,
Pair<Integer, Integer> minMaxQualifiers, QualifierEncodingScheme
encodingScheme) {
@@ -51,12 +51,6 @@ public class RegionScannerResultIterator extends
BaseResultIterator {
this.useQualifierAsIndex =
EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
this.minMaxQualifiers = minMaxQualifiers;
this.encodingScheme = encodingScheme;
- if (scan.isScanMetricsEnabled()) {
- regionScannerContext =
-
ScannerContext.newBuilder().setTrackMetrics(scan.isScanMetricsEnabled()).build();
- } else {
- regionScannerContext = null;
- }
}
@Override
@@ -74,10 +68,10 @@ public class RegionScannerResultIterator extends
BaseResultIterator {
// since this is an indication of whether or not there are more values
after the
// ones returned
boolean hasMore;
- if (regionScannerContext == null) {
+ if (scannerContext == null) {
hasMore = scanner.nextRaw(results);
} else {
- hasMore = scanner.nextRaw(results, regionScannerContext);
+ hasMore = scanner.nextRaw(results, scannerContext);
}
if (!hasMore && results.isEmpty()) {
@@ -98,8 +92,8 @@ public class RegionScannerResultIterator extends
BaseResultIterator {
}
}
- public ScannerContext getRegionScannerContext() {
- return regionScannerContext;
+ public void setRegionScannerContext(ScannerContext scannerContext) {
+ this.scannerContext = scannerContext;
}
@Override
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
index 1a43161568..b3565f3376 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
import static
org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
import static
org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit;
+import static
org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.commitWithException;
import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
import static
org.apache.phoenix.query.QueryServices.USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -33,12 +34,17 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.coprocessor.PagingRegionScanner;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -60,6 +66,7 @@ import
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
@Category(NeedsOwnMiniClusterTest.class)
public class ServerPagingIT extends ParallelStatsDisabledIT {
+ private static final Random RAND = new Random(11);
@BeforeClass
public static synchronized void doSetup() throws Exception {
@@ -262,6 +269,210 @@ public class ServerPagingIT extends
ParallelStatsDisabledIT {
assertEquals(D2.getTime(), rs.getDate(1).getTime());
assertFalse(rs.next());
assertServerPagingMetric(tablename, rs, true);
+ Map<String, Map<MetricType, Long>> metrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRows = getMetricValue(metrics, MetricType.COUNT_ROWS_SCANNED);
+ assertEquals(6, numRows);
+ long numRpcs = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ // 3 regions * (2 rows per region + 1 scanner open with page timeout
set to 0 ms)
+ assertEquals(9, numRpcs);
+ }
+ }
+ }
+
+ @Test
+ public void testMultiKeyPointLookup() throws Exception {
+ final String tablename = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ // use a higher timeout value so that we can trigger a page timeout from
the scanner
+ // rather than the page filter
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(5));
+ String ddl = String.format("CREATE TABLE %s (id VARCHAR NOT NULL, k1
INTEGER NOT NULL, "
+ + "k2 INTEGER NOT NULL, k3 INTEGER, v1 VARCHAR CONSTRAINT pk PRIMARY KEY
(id, k1, k2)) "
+ + "\"%s\" = true", tablename, USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(ddl);
+ String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+ PreparedStatement ps = conn.prepareStatement(dml);
+ int totalRows = 10000;
+ for (int i = 0; i < totalRows; ++i) {
+ ps.setString(1, "id_" + i % 3);
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i);
+ ps.setInt(4, i % 10);
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ if (i != 0 && i % 100 == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ int rowKeyCount = 1000;
+ List<String> inList =
+ Stream.generate(() -> "(?, ?,
?)").limit(rowKeyCount).collect(Collectors.toList());
+ String dql = String.format("select id, k1, k2 from %s where (id, k1, k2)
IN (%s)", tablename,
+ String.join(",", inList));
+ ps = conn.prepareStatement(dql);
+ int expectedValidRows = 0;
+ for (int i = 0; i < rowKeyCount; i++) {
+ ps.setString(3 * i + 1, "id_" + i % 3);
+ if (RAND.nextBoolean()) {
+ ++expectedValidRows;
+ ps.setInt(3 * i + 2, i % 20);
+ } else {
+ // generate a non-existing row key
+ ps.setInt(3 * i + 2, 78123);
+ }
+ ps.setInt(3 * i + 3, i);
+ }
+ int actualValidRows = 0;
+ try (ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ ++actualValidRows;
+ }
+ assertEquals(expectedValidRows, actualValidRows);
+ Map<String, Map<MetricType, Long>> metrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRows = getMetricValue(metrics, MetricType.COUNT_ROWS_SCANNED);
+ assertEquals(expectedValidRows, numRows);
+ long numRpcs = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ assertTrue(numRpcs > 1);
+ }
+ }
+ }
+
+ @Test
+ public void testPagingWithTTLMasking() throws Exception {
+ final String tablename = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ // use a higher timeout value so that we can trigger a page timeout from
the scanner
+ // rather than the page filter
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(10));
+ int ttl = 2; // 2 seconds
+ String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" +
"k1 INTEGER NOT NULL,\n"
+ + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
+ + "CONSTRAINT pk PRIMARY KEY (id, k1, k2)) TTL=" + ttl;
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(ddl);
+ String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+ PreparedStatement ps = conn.prepareStatement(dml);
+ int totalRows = 10000;
+ for (int i = 0; i < totalRows; ++i) {
+ ps.setString(1, "id_" + i % 3);
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i);
+ ps.setInt(4, i % 10);
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ if (i != 0 && i % 100 == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ // Sleep so that the rows expire
+ // Can't use EnvironmentEdgeManager because that messes up page timeout
calculations
+ Thread.sleep(ttl * 1000 + 50);
+ String dql = String.format("SELECT count(*) from %s where id = '%s'",
tablename, "id_2");
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertTrue(rs.next());
+ assertEquals(0, rs.getInt(1));
+ assertFalse(rs.next());
+ Map<String, Map<MetricType, Long>> metrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ // multiple scan rpcs will be executed for every page timeout
+ assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+ }
+ // Insert few more rows
+ int additionalRows = 5;
+ for (int i = 0; i < additionalRows; ++i) {
+ ps.setString(1, "id_2");
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i + totalRows);
+ ps.setInt(4, i % 10);
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ }
+ conn.commit();
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertTrue(rs.next());
+ assertEquals(additionalRows, rs.getInt(1));
+ assertFalse(rs.next());
+ Map<String, Map<MetricType, Long>> metrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ // multiple scan rpcs will be executed for every page timeout
+ assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+ }
+ }
+ }
+
+ @Test
+ public void testPagingWithUnverifiedIndexRows() throws Exception {
+ final String tablename = generateUniqueName();
+ final String indexname = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ // use a higher timeout value so that we can trigger a page timeout from
the scanner
+ // rather than the page filter
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(5));
+ String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" +
"k1 INTEGER NOT NULL,\n"
+ + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
+ + "CONSTRAINT pk PRIMARY KEY (id, k1, k2))";
+ String indexddl = "CREATE INDEX " + indexname + " ON " + tablename + "(k3)
include(v1)";
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.createStatement().execute(ddl);
+ conn.createStatement().execute(indexddl);
+ String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+ PreparedStatement ps = conn.prepareStatement(dml);
+ int totalRows = 10000;
+ for (int i = 0; i < totalRows; ++i) {
+ ps.setString(1, "id_" + i % 3);
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i);
+ ps.setInt(4, i % 10);
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ if (i != 0 && i % 100 == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ String dql = String.format("SELECT count(*) from %s where k3 = 5",
tablename);
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ String explainPlan =
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(explainPlan.contains(indexname));
+ assertTrue(rs.next());
+ assertEquals(totalRows / 10, rs.getInt(1));
+ assertFalse(rs.next());
+ Map<String, Map<MetricType, Long>> metrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ // multiple scan rpcs will be executed for every page timeout
+ assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+ }
+ // Insert few unverified index rows by failing phase 2
+ int additionalRows = 10;
+ for (int i = 0; i < additionalRows; ++i) {
+ ps.setString(1, "id_2");
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i + totalRows);
+ ps.setInt(4, 5); // set k3=5
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ }
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ try {
+ commitWithException(conn);
+ } finally {
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ }
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ String explainPlan =
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(explainPlan.contains(indexname));
+ assertTrue(rs.next());
+ assertEquals(totalRows / 10, rs.getInt(1));
+ assertFalse(rs.next());
+ Map<String, Map<MetricType, Long>> metrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
+ long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ // multiple scan rpcs will be executed for every page timeout
+ assertTrue(String.format("Got %d", numRpc), numRpc > 1);
}
}
}
@@ -481,24 +692,46 @@ public class ServerPagingIT extends
ParallelStatsDisabledIT {
PhoenixStatement stmt =
conn.createStatement().unwrap(PhoenixStatement.class);
stmt.execute(
"CREATE TABLE " + tableName + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z
UNSIGNED_LONG)");
- for (int i = 1; i <= 200; i++) {
+ final int rowCount = 200;
+ for (int i = 1; i <= rowCount; i++) {
String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName,
i, i);
stmt.execute(sql);
}
conn.commit();
// delete every alternate row
- for (int i = 1; i <= 200; i = i + 2) {
+ for (int i = 1; i <= rowCount; i = i + 2) {
stmt.execute("DELETE FROM " + tableName + " WHERE A = " + i);
conn.commit();
}
+ // full table scan
ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
while (rs.next()) {
}
Map<String, Map<MetricType, Long>> metrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
- Assert.assertEquals(101, numRpc);
+ // with 0ms page timeout every row whether it is valid or a delete marker
will generate a page
+ // timeout so the number of rpcs will be row count + 1
+ assertEquals(rowCount + 1, numRpc);
+
+ // aggregate query
+ rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
+ assertTrue(rs.next());
+ assertEquals(rowCount / 2, rs.getInt(1));
+ assertFalse(rs.next());
+ metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ assertEquals(rowCount + 1, numRpc);
+
+ // aggregate query with a filter
+ rs = stmt.executeQuery("SELECT count(*) FROM " + tableName + " where Z % 4
= 0");
+ assertTrue(rs.next());
+ assertEquals(rowCount / 4, rs.getInt(1));
+ assertFalse(rs.next());
+ metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+ numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+ assertEquals(rowCount + 1, numRpc);
}
@Test
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index c13e9c4091..ee9acb4aef 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -1661,7 +1661,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
}
}
- static private void commitWithException(Connection conn) {
+ public static void commitWithException(Connection conn) {
try {
conn.commit();
IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
index d8ca7e0c79..9758dd51f9 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.monitoring;
import static
org.apache.phoenix.query.QueryServices.USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -323,6 +324,37 @@ public class CountRowsScannedIT extends BaseTest {
assertEquals(142, count4);
}
+ @Test
+ public void testLimitOffsetWithoutSplit() throws Exception {
+ final String tablename = generateUniqueName();
+ final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i",
"j", "k", "l", "m", "n",
+ "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+ String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
+ + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "C3.k3
INTEGER,\n"
+ + "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTestTable(getUrl(), ddl);
+ for (int i = 0; i < 26; i++) {
+ conn.createStatement().execute("UPSERT INTO " + tablename + "
values('" + STRINGS[i] + "',"
+ + i + "," + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')");
+ }
+ conn.commit();
+ int limit = 12;
+ int offset = 5;
+ ResultSet rs;
+ rs = conn.createStatement().executeQuery(
+ "SELECT t_id from " + tablename + " order by t_id limit " + limit + "
offset " + offset);
+ int i = 0;
+ while (i < limit) {
+ assertTrue(rs.next());
+ assertEquals("Expected string didn't match for i = " + i,
STRINGS[offset + i],
+ rs.getString(1));
+ i++;
+ }
+ assertEquals(limit + offset, getRowsScanned(rs));
+ }
+ }
+
private long countRowsScannedFromSql(Statement stmt, String sql) throws
SQLException {
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {