This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.3 by this push:
     new e1caf95670 PHOENIX-7707 Phoenix server paging on valid rows (#2294)
e1caf95670 is described below

commit e1caf956708f06e71440605b603eed20a094cf55
Author: tkhurana <[email protected]>
AuthorDate: Wed Oct 8 17:51:12 2025 -0700

    PHOENIX-7707 Phoenix server paging on valid rows (#2294)
---
 .../hbase/regionserver/PhoenixScannerContext.java  | 139 ++++++++++++
 .../hbase/regionserver/ScannerContextUtil.java     |  67 ------
 .../coprocessor/BaseScannerRegionObserver.java     |  34 ++-
 .../phoenix/coprocessor/DelegateRegionScanner.java |  15 +-
 .../GroupedAggregateRegionObserver.java            |  26 ++-
 .../phoenix/coprocessor/HashJoinRegionScanner.java |   8 +-
 .../phoenix/coprocessor/PagingRegionScanner.java   |  31 ++-
 .../phoenix/coprocessor/TTLRegionScanner.java      |  25 ++-
 .../coprocessor/UncoveredIndexRegionScanner.java   |  28 ++-
 .../UncoveredLocalIndexRegionScanner.java          |   6 +-
 .../UngroupedAggregateRegionObserver.java          |  11 -
 .../UngroupedAggregateRegionScanner.java           |  34 ++-
 .../apache/phoenix/index/GlobalIndexChecker.java   |   7 +-
 .../iterate/NonAggregateRegionScannerFactory.java  |  65 ++++--
 .../phoenix/iterate/RegionScannerFactory.java      |   6 -
 .../iterate/RegionScannerResultIterator.java       |  16 +-
 .../org/apache/phoenix/end2end/ServerPagingIT.java | 239 ++++++++++++++++++++-
 .../end2end/index/GlobalIndexCheckerIT.java        |   2 +-
 .../phoenix/monitoring/CountRowsScannedIT.java     |  32 +++
 19 files changed, 587 insertions(+), 204 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java
 
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java
new file mode 100644
index 0000000000..49d10685fc
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/PhoenixScannerContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.phoenix.util.ScanUtil.isDummy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ScannerContext has all methods package visible. To properly update the 
context progress for our
+ * scanners we need this helper
+ */
+public class PhoenixScannerContext extends ScannerContext {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixScannerContext.class);
+
+  // tracks the start time of the rpc on the server for server paging
+  private final long startTime;
+
+  /**
+   * The scanner remains open on the server during the course of multiple scan 
rpc requests. We need
+   * a way to determine during the next() call if it is a new scan rpc request 
on the same scanner.
+   * This is needed so that we can reset the start time for server paging. 
Every scan rpc request
+   * creates a new ScannerContext which has the lastPeekedCell set to null in 
the beginning.
+   * Subsequent next() calls will set this field in the ScannerContext.
+   */
+  public static boolean isNewScanRpcRequest(ScannerContext scannerContext) {
+    return scannerContext != null && scannerContext.getLastPeekedCell() == 
null;
+  }
+
+  public PhoenixScannerContext(ScannerContext hbaseContext) {
+    // set limits to null to create no limit context
+    super(Objects.requireNonNull(hbaseContext).keepProgress, null,
+      Objects.requireNonNull(hbaseContext).isTrackingMetrics());
+    startTime = EnvironmentEdgeManager.currentTimeMillis();
+  }
+
+  public PhoenixScannerContext(boolean trackMetrics) {
+    super(false, null, trackMetrics);
+    startTime = EnvironmentEdgeManager.currentTimeMillis();
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void incrementSizeProgress(List<Cell> cells) {
+    for (Cell cell : cells) {
+      
super.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 
cell.heapSize());
+    }
+  }
+
+  /**
+   * returnImmediately is a private field in ScannerContext and there is no 
getter API on it But the
+   * checkTimeLimit API on the ScannerContext will return true if 
returnImmediately is set
+   */
+  public boolean isReturnImmediately() {
+    return checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
+  }
+
+  /**
+   * Update the scanner context created by RSRpcServices so that it can act 
accordingly
+   * @param dst    hbase scanner context created on every new scan rpc request
+   * @param result list of cells to be returned to the client as scan rpc 
response
+   */
+  public void updateHBaseScannerContext(ScannerContext dst, List<Cell> result) 
{
+    if (dst == null) {
+      return;
+    }
+    // update last peeked cell
+    dst.setLastPeekedCell(getLastPeekedCell());
+    // update return immediately
+    if (isDummy(result) || isReturnImmediately()) {
+      // when a dummy row is returned by a lower layer, set returnImmediately
+      // on the ScannerContext to force HBase to return a response to the 
client
+      dst.returnImmediately();
+    }
+    // update metrics
+    if (isTrackingMetrics() && dst.isTrackingMetrics()) {
+      // getMetricsMap call resets the metrics internally
+      for (Map.Entry<String, Long> entry : 
getMetrics().getMetricsMap().entrySet()) {
+        dst.metrics.addToCounter(entry.getKey(), entry.getValue());
+      }
+    }
+    // update progress
+    dst.setProgress(getBatchProgress(), getDataSizeProgress(), 
getHeapSizeProgress());
+  }
+
+  public static boolean isTimedOut(ScannerContext context, long pageSizeMs) {
+    if (context == null || !(context instanceof PhoenixScannerContext)) {
+      return false;
+    }
+    PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) 
context;
+    return EnvironmentEdgeManager.currentTimeMillis() - 
phoenixScannerContext.startTime
+        > pageSizeMs;
+  }
+
+  /**
+   * Set returnImmediately on the ScannerContext to true, it will have the 
same behavior as reaching
+   * the time limit. Use this to make RSRpcService.scan return immediately.
+   */
+  public static void setReturnImmediately(ScannerContext context) {
+    if (context == null || !(context instanceof PhoenixScannerContext)) {
+      return;
+    }
+    PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) 
context;
+    phoenixScannerContext.returnImmediately();
+  }
+
+  public static boolean isReturnImmediately(ScannerContext context) {
+    if (context == null || !(context instanceof PhoenixScannerContext)) {
+      return false;
+    }
+    PhoenixScannerContext phoenixScannerContext = (PhoenixScannerContext) 
context;
+    return phoenixScannerContext.isReturnImmediately();
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
 
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
deleted file mode 100644
index 23bf60cde0..0000000000
--- 
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.PrivateCellUtil;
-
-/**
- * ScannerContext has all methods package visible. To properly update the 
context progress for our
- * scanners we need this helper
- */
-public class ScannerContextUtil {
-  public static void incrementSizeProgress(ScannerContext sc, List<Cell> 
cells) {
-    for (Cell cell : cells) {
-      
sc.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell), 
cell.heapSize());
-    }
-  }
-
-  public static void updateMetrics(ScannerContext src, ScannerContext dst) {
-    if (src != null && dst != null && src.isTrackingMetrics() && 
dst.isTrackingMetrics()) {
-      for (Map.Entry<String, Long> entry : 
src.getMetrics().getMetricsMap().entrySet()) {
-        dst.metrics.addToCounter(entry.getKey(), entry.getValue());
-      }
-    }
-  }
-
-  public static ScannerContext copyNoLimitScanner(ScannerContext sc) {
-    return new ScannerContext(sc.keepProgress, null, sc.isTrackingMetrics());
-  }
-
-  public static void updateTimeProgress(ScannerContext sc) {
-    sc.updateTimeProgress();
-  }
-
-  /**
-   * Set returnImmediately on the ScannerContext to true, it will have the 
same behavior as reaching
-   * the time limit. Use this to make RSRpcService.scan return immediately.
-   */
-  public static void setReturnImmediately(ScannerContext sc) {
-    sc.returnImmediately();
-  }
-
-  /**
-   * returnImmediately is a private field in ScannerContext and there is no 
getter API on it But the
-   * checkTimeLimit API on the ScannerContext will return true if 
returnImmediately is set
-   */
-  public static boolean checkTimeLimit(ScannerContext sc) {
-    return sc.checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
-  }
-}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index b0a134151f..723eaee039 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -37,12 +37,12 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanOptions;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.regionserver.Store;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -191,6 +191,8 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
     private final Scan scan;
     private final ObserverContext<RegionCoprocessorEnvironment> c;
     private boolean wasOverriden;
+    // tracks the current phoenix scanner context corresponding to the hbase 
scanner context
+    private PhoenixScannerContext phoenixScannerContext;
 
     public RegionScannerHolder(ObserverContext<RegionCoprocessorEnvironment> 
c, Scan scan,
       final RegionScanner scanner) {
@@ -248,10 +250,7 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
 
     @Override
     public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-      overrideDelegate();
-      boolean res = super.next(result, scannerContext);
-      ScannerContextUtil.incrementSizeProgress(scannerContext, result);
-      return res;
+      return nextInternal(result, scannerContext, false);
     }
 
     @Override
@@ -262,9 +261,30 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
 
     @Override
     public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+      return nextInternal(result, scannerContext, true);
+    }
+
+    private boolean nextInternal(List<Cell> result, ScannerContext 
scannerContext, boolean isRaw)
+      throws IOException {
       overrideDelegate();
-      boolean res = super.nextRaw(result, scannerContext);
-      ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+      if (scannerContext instanceof PhoenixScannerContext) {
+        // This is an optimization to avoid creating multiple phoenix scanner 
context objects for
+        // the same scan rpc request when multiple RegionScannerHolder objects 
are stacked which
+        // happens if multiple coprocs (not scanners) are processing the scan 
like
+        // UngroupedAggregateRegionObserver and GlobalIndexChecker
+        phoenixScannerContext = (PhoenixScannerContext) scannerContext;
+      } else if (PhoenixScannerContext.isNewScanRpcRequest(scannerContext)) {
+        // An open scanner can process multiple scan rpcs during its lifetime.
+        // We need to create a new phoenix scanner context for every new scan 
rpc request.
+        phoenixScannerContext = new PhoenixScannerContext(scannerContext);
+      }
+      boolean res = isRaw
+        ? super.nextRaw(result, phoenixScannerContext)
+        : super.next(result, phoenixScannerContext);
+      if (!(scannerContext instanceof PhoenixScannerContext)) {
+        // only update the top level hbase scanner context
+        phoenixScannerContext.updateHBaseScannerContext(scannerContext, 
result);
+      }
       return res;
     }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index ad64e3ae58..4563f102fd 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.coprocessor;
 
-import static org.apache.phoenix.util.ScanUtil.isDummy;
-
 import java.io.IOException;
 import java.util.List;
 import org.apache.hadoop.hbase.Cell;
@@ -27,7 +25,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 
 public class DelegateRegionScanner implements RegionScanner {
 
@@ -103,17 +100,7 @@ public class DelegateRegionScanner implements 
RegionScanner {
   private boolean next(List<Cell> result, boolean raw, ScannerContext 
scannerContext)
     throws IOException {
     if (scannerContext != null) {
-      ScannerContext noLimitContext = 
ScannerContextUtil.copyNoLimitScanner(scannerContext);
-      boolean hasMore =
-        raw ? delegate.nextRaw(result, noLimitContext) : delegate.next(result, 
noLimitContext);
-      if (isDummy(result) || 
ScannerContextUtil.checkTimeLimit(noLimitContext)) {
-        // when a dummy row is returned by a lower layer or if the result is 
valid but the lower
-        // layer signals us to return immediately, we need to set 
returnImmediately
-        // on the ScannerContext to force HBase to return a response to the 
client
-        ScannerContextUtil.setReturnImmediately(scannerContext);
-      }
-      ScannerContextUtil.updateMetrics(noLimitContext, scannerContext);
-      return hasMore;
+      return raw ? delegate.nextRaw(result, scannerContext) : 
delegate.next(result, scannerContext);
     }
     return raw ? delegate.nextRaw(result) : delegate.next(result);
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 39d9527a83..b7163c0a3f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -81,7 +82,6 @@ import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
 import org.apache.phoenix.util.PhoenixKeyValueUtil;
@@ -605,8 +605,6 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     private boolean nextInternal(List<Cell> resultsToReturn, ScannerContext 
scannerContext)
       throws IOException {
       boolean hasMore;
-      long startTime = EnvironmentEdgeManager.currentTimeMillis();
-      long now;
       Tuple result =
         useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new 
MultiKeyValueTuple();
       boolean acquiredLock = false;
@@ -643,8 +641,11 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
               // Aggregate values here
               aggregators.aggregate(rowAggregators, result);
             }
-            now = EnvironmentEdgeManager.currentTimeMillis();
-            if (hasMore && groupByCache.size() < limit && (now - startTime) >= 
pageSizeMs) {
+            if (
+              hasMore && groupByCache.size() < limit
+                && (PhoenixScannerContext.isReturnImmediately(scannerContext)
+                  || PhoenixScannerContext.isTimedOut(scannerContext, 
pageSizeMs))
+            ) {
               return getDummyResult(resultsToReturn);
             }
           } while (hasMore && groupByCache.size() < limit);
@@ -784,8 +785,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
       boolean hasMore;
       boolean atLimit;
       boolean aggBoundary = false;
-      long startTime = EnvironmentEdgeManager.currentTimeMillis();
-      long now;
+      boolean pageTimeout = false;
       Tuple result =
         useQualifierAsIndex ? new PositionBasedMultiKeyValueTuple() : new 
MultiKeyValueTuple();
       ImmutableBytesPtr key = null;
@@ -835,8 +835,14 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             atLimit = rowCount + countOffset >= limit;
             // Do rowCount + 1 b/c we don't have to wait for a complete
             // row in the case of a DISTINCT with a LIMIT
-            now = EnvironmentEdgeManager.currentTimeMillis();
-          } while (hasMore && !aggBoundary && !atLimit && (now - startTime) < 
pageSizeMs);
+            if (
+              PhoenixScannerContext.isReturnImmediately(scannerContext)
+                || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+            ) {
+              pageTimeout = true;
+              break;
+            }
+          } while (hasMore && !aggBoundary && !atLimit && !pageTimeout);
         }
       } catch (Exception e) {
         LOGGER.error("Ordered group-by scanner next encountered error for 
region {}",
@@ -850,7 +856,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         if (acquiredLock) region.closeRegionOperation();
       }
       try {
-        if (hasMore && !aggBoundary && !atLimit && (now - startTime) >= 
pageSizeMs) {
+        if (hasMore && !aggBoundary && !atLimit && pageTimeout) {
           updateDummyWithPrevRowKey(results, initStartRowKey, 
includeInitStartRowKey, scan);
           return true;
         }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index e56d9028f4..ab62a7ddf7 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -59,7 +60,6 @@ import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ClientUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.TupleUtil;
 
 public class HashJoinRegionScanner implements RegionScanner {
@@ -306,7 +306,6 @@ public class HashJoinRegionScanner implements RegionScanner 
{
   private boolean next(List<Cell> result, boolean raw, ScannerContext 
scannerContext)
     throws IOException {
     try {
-      long startTime = EnvironmentEdgeManager.currentTimeMillis();
       while (shouldAdvance()) {
         if (scannerContext != null) {
           hasMore =
@@ -322,7 +321,10 @@ public class HashJoinRegionScanner implements 
RegionScanner {
         }
         Cell cell = result.get(0);
         processResults(result, false);
-        if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= 
pageSizeMs) {
+        if (
+          PhoenixScannerContext.isReturnImmediately(scannerContext)
+            || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+        ) {
           byte[] rowKey = CellUtil.cloneRow(cell);
           result.clear();
           getDummyResult(rowKey, result);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
index e3f1d6afea..7cde2a393f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
@@ -26,6 +26,7 @@ import 
org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -34,7 +35,6 @@ import org.apache.phoenix.filter.PagingFilter;
 import org.apache.phoenix.filter.SkipScanFilter;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,17 +60,16 @@ public class PagingRegionScanner extends BaseRegionScanner {
   private PagingFilter pagingFilter;
   private MultiKeyPointLookup multiKeyPointLookup = null;
   private boolean initialized = false;
+  private long pageSizeMs;
 
   private class MultiKeyPointLookup {
     private SkipScanFilter skipScanFilter;
     private List<KeyRange> pointLookupRanges = null;
     private int lookupPosition = 0;
     private byte[] lookupKeyPrefix = null;
-    private long pageSizeMs;
 
     private MultiKeyPointLookup(SkipScanFilter skipScanFilter) throws 
IOException {
       this.skipScanFilter = skipScanFilter;
-      pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
       pointLookupRanges = skipScanFilter.getPointLookupKeyRanges();
       lookupPosition = findLookupPosition(scan.getStartRow());
       if (skipScanFilter.getOffset() > 0) {
@@ -133,7 +132,6 @@ public class PagingRegionScanner extends BaseRegionScanner {
     private boolean next(List<Cell> results, boolean raw, RegionScanner 
scanner,
       ScannerContext scannerContext) throws IOException {
       try {
-        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         while (true) {
           boolean hasMore;
           if (scannerContext != null) {
@@ -152,15 +150,19 @@ public class PagingRegionScanner extends 
BaseRegionScanner {
               "Each scan is supposed to return only one row, scan " + scan + 
", region " + region);
           }
           if (!results.isEmpty()) {
+            if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
+              // we got a valid result but scanner timed out so return 
immediately
+              PhoenixScannerContext.setReturnImmediately(scannerContext);
+            }
             return hasMore();
           }
           // The scanner returned an empty result. This means that one of the 
rows
-          // has been deleted.
+          // has been deleted or the row key is not present in the table.
           if (!hasMore()) {
             return false;
           }
 
-          if (EnvironmentEdgeManager.currentTimeMillis() - startTime > 
pageSizeMs) {
+          if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
             byte[] rowKey = pointLookupRanges.get(lookupPosition - 
1).getLowerRange();
             ScanUtil.getDummyResult(rowKey, results);
             return true;
@@ -187,6 +189,7 @@ public class PagingRegionScanner extends BaseRegionScanner {
     this.region = region;
     this.scan = scan;
     pagingFilter = ScanUtil.getPhoenixPagingFilter(scan);
+    pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
   }
 
   @VisibleForTesting
@@ -258,10 +261,11 @@ public class PagingRegionScanner extends 
BaseRegionScanner {
       }
     }
 
+    boolean hasMore;
     if (multiKeyPointLookup != null) {
       return multiKeyPointLookup.next(results, raw, delegate, scannerContext);
     }
-    boolean hasMore;
+
     if (scannerContext != null) {
       hasMore =
         raw ? delegate.nextRaw(results, scannerContext) : 
delegate.next(results, scannerContext);
@@ -277,16 +281,23 @@ public class PagingRegionScanner extends 
BaseRegionScanner {
       if (pagingFilter.isStopped()) {
         if (results.isEmpty()) {
           byte[] rowKey = pagingFilter.getCurrentRowKeyToBeExcluded();
-          LOGGER.info("Page filter stopped, generating dummy key {} ",
-            Bytes.toStringBinary(rowKey));
+          LOGGER.info("{} Paging filter stopped, generating dummy key {} ",
+            getRegionInfo().getRegionNameAsString(), 
Bytes.toStringBinary(rowKey));
           ScanUtil.getDummyResult(rowKey, results);
+        } else {
+          // we got a valid result but page filter stopped set return 
immediately
+          PhoenixScannerContext.setReturnImmediately(scannerContext);
         }
         return true;
       }
       return false;
     } else {
       // We got a row from the HBase scanner within the configured time (i.e.,
-      // the page size). We need to start a new page on the next next() call.
+      // the page size).
+      if (PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)) {
+        // we got a valid result but scanner timed out so return immediately
+        PhoenixScannerContext.setReturnImmediately(scannerContext);
+      }
       return true;
     }
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 8a4fc50299..6ed234db6d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.phoenix.schema.CompiledTTLExpression;
@@ -212,7 +213,8 @@ public class TTLRegionScanner extends BaseRegionScanner {
     return false;
   }
 
-  private boolean skipExpired(List<Cell> result, boolean raw, boolean hasMore) 
throws IOException {
+  private boolean skipExpired(List<Cell> result, boolean raw, boolean hasMore,
+    ScannerContext scannerContext) throws IOException {
     boolean expired = isExpired(result);
     if (!expired) {
       return hasMore;
@@ -221,23 +223,28 @@ public class TTLRegionScanner extends BaseRegionScanner {
     if (!hasMore) {
       return false;
     }
-    long startTime = EnvironmentEdgeManager.currentTimeMillis();
     do {
-      hasMore = raw ? delegate.nextRaw(result) : delegate.next(result);
+      hasMore =
+        raw ? delegate.nextRaw(result, scannerContext) : delegate.next(result, 
scannerContext);
       if (result.isEmpty() || ScanUtil.isDummy(result)) {
-        return hasMore;
+        break;
       }
+      // non dummy result check if it is expired
       if (!isExpired(result)) {
-        return hasMore;
+        break;
       }
+      // result is expired
       Cell cell = result.get(0);
       result.clear();
-      if (EnvironmentEdgeManager.currentTimeMillis() - startTime > pageSizeMs) 
{
+      if (
+        PhoenixScannerContext.isReturnImmediately(scannerContext)
+          || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+      ) {
         ScanUtil.getDummyResult(CellUtil.cloneRow(cell), result);
-        return hasMore;
+        break;
       }
     } while (hasMore);
-    return false;
+    return hasMore;
   }
 
   private boolean next(List<Cell> result, boolean raw, ScannerContext 
scannerContext)
@@ -267,7 +274,7 @@ public class TTLRegionScanner extends BaseRegionScanner {
     if (result.isEmpty() || ScanUtil.isDummy(result)) {
       return hasMore;
     }
-    hasMore = skipExpired(result, raw, hasMore);
+    hasMore = skipExpired(result, raw, hasMore, scannerContext);
     if (result.isEmpty() || ScanUtil.isDummy(result)) {
       return hasMore;
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 6e1b853bac..1b126a6591 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -210,9 +211,8 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
     }
   }
 
-  protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
-    final byte[] actualStartKey, final int offset, ScannerContext 
scannerContext)
-    throws IOException {
+  protected boolean scanIndexTableRows(List<Cell> result, final byte[] 
actualStartKey,
+    final int offset, ScannerContext scannerContext) throws IOException {
     boolean hasMore = false;
     if (actualStartKey != null) {
       do {
@@ -231,7 +231,10 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
             firstCell.getRowLength(), actualStartKey, 0, 
actualStartKey.length) < 0
         ) {
           result.clear();
-          if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= 
pageSizeMs) {
+          if (
+            PhoenixScannerContext.isReturnImmediately(scannerContext)
+              || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+          ) {
             byte[] rowKey = CellUtil.cloneRow(firstCell);
             ScanUtil.getDummyResult(rowKey, result);
             return true;
@@ -270,7 +273,10 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
             viewConstants));
         indexRows.add(row);
         indexRowCount++;
-        if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - 
startTime) >= pageSizeMs) {
+        if (
+          hasMore && (PhoenixScannerContext.isTimedOut(scannerContext, 
pageSizeMs)
+            || PhoenixScannerContext.isReturnImmediately(scannerContext))
+        ) {
           getDummyResult(lastIndexRowKey, result);
           // We do not need to change the state, State.SCANNING_INDEX
           // since we will continue scanning the index table after
@@ -283,9 +289,9 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
     return hasMore;
   }
 
-  protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
-    ScannerContext scannerContext) throws IOException {
-    return scanIndexTableRows(result, startTime, null, 0, scannerContext);
+  protected boolean scanIndexTableRows(List<Cell> result, ScannerContext 
scannerContext)
+    throws IOException {
+    return scanIndexTableRows(result, null, 0, scannerContext);
   }
 
   private boolean verifyIndexRowAndRepairIfNecessary(Result dataRow, byte[] 
indexRowKey,
@@ -393,7 +399,9 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
    */
   @Override
   public boolean next(List<Cell> result, ScannerContext scannerContext) throws 
IOException {
-    long startTime = EnvironmentEdgeManager.currentTimeMillis();
+    long startTime = (scannerContext != null)
+      ? ((PhoenixScannerContext) scannerContext).getStartTime()
+      : EnvironmentEdgeManager.currentTimeMillis();
     boolean hasMore;
     region.startRegionOperation();
     try {
@@ -409,7 +417,7 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
           state = State.SCANNING_INDEX;
         }
         if (state == State.SCANNING_INDEX) {
-          hasMore = scanIndexTableRows(result, startTime, scannerContext);
+          hasMore = scanIndexTableRows(result, scannerContext);
           if (isDummy(result)) {
             updateDummyWithPrevRowKey(result, initStartRowKey, 
includeInitStartRowKey, scan);
             return hasMore;
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index 34d9fb67b9..6b5d124ce0 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -105,9 +105,9 @@ public class UncoveredLocalIndexRegionScanner extends 
UncoveredIndexRegionScanne
   }
 
   @Override
-  protected boolean scanIndexTableRows(List<Cell> result, final long startTime,
-    ScannerContext scannerContext) throws IOException {
-    return scanIndexTableRows(result, startTime, actualStartKey, offset, 
scannerContext);
+  protected boolean scanIndexTableRows(List<Cell> result, ScannerContext 
scannerContext)
+    throws IOException {
+    return scanIndexTableRows(result, actualStartKey, offset, scannerContext);
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 7fdfdeeb60..d85f800540 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.regionserver.Store;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -766,13 +765,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
 
   private RegionScanner collectStats(final RegionScanner innerScanner, 
StatisticsCollector stats,
     final Region region, final Scan scan, Configuration config) throws 
IOException {
-    ScannerContext groupScannerContext;
-    if (scan.isScanMetricsEnabled()) {
-      groupScannerContext =
-        
ScannerContext.newBuilder().setTrackMetrics(scan.isScanMetricsEnabled()).build();
-    } else {
-      groupScannerContext = null;
-    }
     StatsCollectionCallable callable =
       new StatsCollectionCallable(stats, region, innerScanner, config, scan);
     byte[] asyncBytes =
@@ -826,9 +818,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
 
       @Override
       public boolean next(List<Cell> results, ScannerContext scannerContext) 
throws IOException {
-        if (groupScannerContext != null && scannerContext != null) {
-          ScannerContextUtil.updateMetrics(groupScannerContext, 
scannerContext);
-        }
         return next(results);
       }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 20ace9a02e..1a4794e86e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -61,10 +61,10 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.GlobalCache;
@@ -110,7 +110,6 @@ import org.apache.phoenix.transaction.TransactionFactory;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.ClientUtil;
 import org.apache.phoenix.util.EncodedColumnsUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ExpressionUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.LogUtil;
@@ -604,8 +603,6 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
   public boolean next(List<Cell> resultsToReturn, ScannerContext 
scannerContext)
     throws IOException {
     boolean hasMore;
-    boolean returnImmediately = false;
-    long startTime = EnvironmentEdgeManager.currentTimeMillis();
     Configuration conf = env.getConfiguration();
     final TenantCache tenantCache = GlobalCache.getTenantCache(env, 
ScanUtil.getTenantId(scan));
     try (MemoryManager.MemoryChunk em = 
tenantCache.getMemoryManager().allocate(0)) {
@@ -648,10 +645,10 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
                 resultsToReturn.addAll(results);
                 return true;
               }
-              // we got a dummy result from the lower scanner but hasAny is 
true which means that
-              // we have a valid result which can be returned to the client 
instead of a dummy.
-              // We need to signal the RPC handler to return.
-              returnImmediately = true;
+              // we got a page timeout from the lower scanner but hasAny is 
true which means that
+              // we have a valid result which we can return to the client 
instead of a dummy but we
+              // still need to finish the rpc and release the handler
+              PhoenixScannerContext.setReturnImmediately(scannerContext);
               break;
             }
             if (!results.isEmpty()) {
@@ -705,13 +702,16 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
               aggregators.aggregate(rowAggregators, result);
               hasAny = true;
             }
-          } while (
-            hasMore && (EnvironmentEdgeManager.currentTimeMillis() - 
startTime) < pageSizeMs
-          );
-          if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= 
pageSizeMs) {
-            // we hit a page scanner timeout, signal the RPC handler to return.
-            returnImmediately = true;
-          }
+            if (
+              PhoenixScannerContext.isReturnImmediately(scannerContext)
+                || PhoenixScannerContext.isTimedOut(scannerContext, pageSizeMs)
+            ) {
+              // we could have a valid result which we can return to the 
client instead of a dummy,
+              // but we still need to finish the rpc and release the handler
+              PhoenixScannerContext.setReturnImmediately(scannerContext);
+              break;
+            }
+          } while (hasMore);
           if (!mutations.isEmpty()) {
             if (!isSingleRowDelete) {
               annotateAndCommit(mutations);
@@ -763,10 +763,6 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
         }
         resultsToReturn.add(keyValue);
       }
-      if (returnImmediately && scannerContext != null) {
-        // signal the RPC handler to return
-        ScannerContextUtil.setReturnImmediately(scannerContext);
-      }
       return hasMore;
     }
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 8e82f17036..97f23c498f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -256,7 +257,6 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
           init();
           initialized = true;
         }
-        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         do {
           if (raw) {
             hasMore = (scannerContext == null)
@@ -277,7 +277,10 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
           if (verifyRowAndRepairIfNecessary(result)) {
             break;
           }
-          if (hasMore && (EnvironmentEdgeManager.currentTimeMillis() - 
startTime) >= pageSizeMs) {
+          if (
+            hasMore && (PhoenixScannerContext.isTimedOut(scannerContext, 
pageSizeMs)
+              || PhoenixScannerContext.isReturnImmediately(scannerContext))
+          ) {
             byte[] rowKey = CellUtil.cloneRow(cell);
             result.clear();
             getDummyResult(rowKey, result);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index cf7832b794..65affd6e79 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -38,10 +38,10 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
@@ -182,7 +182,11 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
         ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan);
       RegionScannerResultIterator iterator = new 
RegionScannerResultIterator(scan, innerScanner,
         getMinMaxQualifiersFromScan(scan), encodingScheme);
-      ScannerContext sc = iterator.getRegionScannerContext();
+      // we need to create our own scanner context because we are still 
opening the scanner and
+      // and don't have a rpc scanner context which is created in the next() 
call. This scanner
+      // context is used when we are skipping the rows until we hit the offset
+      PhoenixScannerContext sc = new 
PhoenixScannerContext(scan.isScanMetricsEnabled());
+      iterator.setRegionScannerContext(sc);
       innerScanner = getOffsetScanner(innerScanner,
         new OffsetResultIterator(iterator, scanOffset, 
getPageSizeMsForRegionScanner(scan),
           isIncompatibleClient),
@@ -280,10 +284,15 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
         EncodedColumnsUtil.getQualifierEncodingScheme(scan);
       RegionScannerResultIterator inner = new 
RegionScannerResultIterator(scan, s,
         EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
+      // we need to create our own scanner context because we are still 
opening the scanner and
+      // and don't have a rpc scanner context which is created in the next() 
call. This scanner
+      // context is used when we are iterating over the top n rows before the 
first next() call
+      PhoenixScannerContext sc = new 
PhoenixScannerContext(scan.isScanMetricsEnabled());
+      inner.setRegionScannerContext(sc);
       OrderedResultIterator iterator = new OrderedResultIterator(inner, 
orderByExpressions,
         spoolingEnabled, thresholdBytes, limit >= 0 ? limit : null, null, 
estimatedRowSize,
         getPageSizeMsForRegionScanner(scan), scan, s.getRegionInfo());
-      return new 
OrderedResultIteratorWithScannerContext(inner.getRegionScannerContext(), 
iterator);
+      return new OrderedResultIteratorWithScannerContext(sc, iterator);
     } catch (IOException e) {
       throw new RuntimeException(e);
     } finally {
@@ -296,15 +305,15 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
   }
 
   private static class OrderedResultIteratorWithScannerContext {
-    private ScannerContext scannerContext;
+    private PhoenixScannerContext scannerContext;
     private OrderedResultIterator iterator;
 
-    OrderedResultIteratorWithScannerContext(ScannerContext sc, 
OrderedResultIterator ori) {
+    OrderedResultIteratorWithScannerContext(PhoenixScannerContext sc, 
OrderedResultIterator ori) {
       this.scannerContext = sc;
       this.iterator = ori;
     }
 
-    public ScannerContext getScannerContext() {
+    public PhoenixScannerContext getScannerContext() {
       return scannerContext;
     }
 
@@ -363,7 +372,7 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
 
   private RegionScanner getOffsetScanner(final RegionScanner s, final 
OffsetResultIterator iterator,
     final boolean isLastScan, final boolean incompatibleClient, final Scan 
scan,
-    final ScannerContext sc) throws IOException {
+    final PhoenixScannerContext sc) throws IOException {
     final Tuple firstTuple;
     final Region region = getRegion();
     region.startRegionOperation();
@@ -436,7 +445,9 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
     return new BaseRegionScanner(s) {
       private Tuple tuple = firstTuple;
       private byte[] previousResultRowKey;
-      private ScannerContext regionScannerContext = sc;
+      // scanner context used when we are opening the scanner and skipping up 
to offset rows
+      // We copy this context to the hbase rpc context on the first next call
+      private PhoenixScannerContext regionScannerContext = sc;
 
       @Override
       public boolean isFilterDone() {
@@ -454,6 +465,18 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
           if (isFilterDone()) {
             return false;
           }
+          if (regionScannerContext != null) {
+            regionScannerContext.updateHBaseScannerContext(scannerContext, 
results);
+            // we no longer need this context
+            regionScannerContext = null;
+            if (PhoenixScannerContext.isReturnImmediately(scannerContext)) {
+              return true;
+            }
+          }
+          RegionScannerResultIterator delegate =
+            (RegionScannerResultIterator) (iterator.getDelegate());
+          // just use the scanner context passed to us from now on
+          delegate.setRegionScannerContext(scannerContext);
           Tuple nextTuple = iterator.next();
           if (tuple.size() > 0 && !isDummy(tuple)) {
             for (int i = 0; i < tuple.size(); i++) {
@@ -478,10 +501,6 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
             }
           }
           tuple = nextTuple;
-          if (regionScannerContext != null) {
-            ScannerContextUtil.updateMetrics(regionScannerContext, 
scannerContext);
-            regionScannerContext = null;
-          }
           return !isFilterDone();
         } catch (Throwable t) {
           LOGGER.error("Error while iterating Offset scanner.", t);
@@ -541,7 +560,7 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
    * region) since after this everything is held in memory
    */
   private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final 
RegionScanner s,
-    final OrderedResultIterator iterator, ImmutableBytesPtr tenantId, 
ScannerContext sc)
+    final OrderedResultIterator iterator, ImmutableBytesPtr tenantId, 
PhoenixScannerContext sc)
     throws Throwable {
 
     final Tuple firstTuple;
@@ -565,7 +584,9 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
     }
     return new BaseRegionScanner(s) {
       private Tuple tuple = firstTuple;
-      private ScannerContext regionScannerContext = sc;
+      // scanner context used when we are opening the scanner and reading the 
topN rows
+      // We copy this context to the hbase rpc context on the first next call
+      private PhoenixScannerContext regionScannerContext = sc;
 
       @Override
       public boolean isFilterDone() {
@@ -583,6 +604,14 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
           if (isFilterDone()) {
             return false;
           }
+          if (regionScannerContext != null) {
+            regionScannerContext.updateHBaseScannerContext(scannerContext, 
results);
+            // we no longer need this context
+            regionScannerContext = null;
+            if (PhoenixScannerContext.isReturnImmediately(scannerContext)) {
+              return true;
+            }
+          }
           if (isDummy(tuple)) {
             ScanUtil.getDummyResult(CellUtil.cloneRow(tuple.getValue(0)), 
results);
           } else {
@@ -590,11 +619,11 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
               results.add(tuple.getValue(i));
             }
           }
+          RegionScannerResultIterator delegate =
+            (RegionScannerResultIterator) (iterator.getDelegate());
+          // just use the scanner context passed to us from now on
+          delegate.setRegionScannerContext(scannerContext);
           tuple = iterator.next();
-          if (regionScannerContext != null) {
-            ScannerContextUtil.updateMetrics(regionScannerContext, 
scannerContext);
-            regionScannerContext = null;
-          }
           return !isFilterDone();
         } catch (Throwable t) {
           
ClientUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 9645ed413b..bda112e709 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
-import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
@@ -308,11 +307,6 @@ public abstract class RegionScannerFactory {
           if (extraLimit >= 0 && --extraLimit == 0) {
             return false;
           }
-          // There is a scanattribute set to retrieve the specific array 
element
-          if (scannerContext != null) {
-            ScannerContextUtil.incrementSizeProgress(scannerContext, result);
-            ScannerContextUtil.updateTimeProgress(scannerContext);
-          }
           return next;
         } catch (Throwable t) {
           
ClientUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(),
 t);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index c0cc6ec84e..adbca1f859 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -43,7 +43,7 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
   private final Pair<Integer, Integer> minMaxQualifiers;
   private final boolean useQualifierAsIndex;
   private final QualifierEncodingScheme encodingScheme;
-  private final ScannerContext regionScannerContext;
+  private ScannerContext scannerContext;
 
   public RegionScannerResultIterator(Scan scan, RegionScanner scanner,
     Pair<Integer, Integer> minMaxQualifiers, QualifierEncodingScheme 
encodingScheme) {
@@ -51,12 +51,6 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
     this.useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
     this.minMaxQualifiers = minMaxQualifiers;
     this.encodingScheme = encodingScheme;
-    if (scan.isScanMetricsEnabled()) {
-      regionScannerContext =
-        
ScannerContext.newBuilder().setTrackMetrics(scan.isScanMetricsEnabled()).build();
-    } else {
-      regionScannerContext = null;
-    }
   }
 
   @Override
@@ -74,10 +68,10 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
         // since this is an indication of whether or not there are more values 
after the
         // ones returned
         boolean hasMore;
-        if (regionScannerContext == null) {
+        if (scannerContext == null) {
           hasMore = scanner.nextRaw(results);
         } else {
-          hasMore = scanner.nextRaw(results, regionScannerContext);
+          hasMore = scanner.nextRaw(results, scannerContext);
         }
 
         if (!hasMore && results.isEmpty()) {
@@ -98,8 +92,8 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
     }
   }
 
-  public ScannerContext getRegionScannerContext() {
-    return regionScannerContext;
+  public void setRegionScannerContext(ScannerContext scannerContext) {
+    this.scannerContext = scannerContext;
   }
 
   @Override
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
index 1a43161568..b3565f3376 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.end2end;
 
 import static 
org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
 import static 
org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit;
+import static 
org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.commitWithException;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER;
 import static 
org.apache.phoenix.query.QueryServices.USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
@@ -33,12 +34,17 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessor.PagingRegionScanner;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.jdbc.PhoenixStatement;
@@ -60,6 +66,7 @@ import 
org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 
 @Category(NeedsOwnMiniClusterTest.class)
 public class ServerPagingIT extends ParallelStatsDisabledIT {
+  private static final Random RAND = new Random(11);
 
   @BeforeClass
   public static synchronized void doSetup() throws Exception {
@@ -262,6 +269,210 @@ public class ServerPagingIT extends 
ParallelStatsDisabledIT {
         assertEquals(D2.getTime(), rs.getDate(1).getTime());
         assertFalse(rs.next());
         assertServerPagingMetric(tablename, rs, true);
+        Map<String, Map<MetricType, Long>> metrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRows = getMetricValue(metrics, MetricType.COUNT_ROWS_SCANNED);
+        assertEquals(6, numRows);
+        long numRpcs = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        // 3 regions * (2 rows per region + 1 scanner open with page timeout 
set to 0 ms)
+        assertEquals(9, numRpcs);
+      }
+    }
+  }
+
+  @Test
+  public void testMultiKeyPointLookup() throws Exception {
+    final String tablename = generateUniqueName();
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    // use a higher timeout value so that we can trigger a page timeout from 
the scanner
+    // rather than the page filter
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(5));
+    String ddl = String.format("CREATE TABLE %s (id VARCHAR NOT NULL, k1 
INTEGER NOT NULL, "
+      + "k2 INTEGER NOT NULL, k3 INTEGER, v1 VARCHAR CONSTRAINT pk PRIMARY KEY 
(id, k1, k2)) "
+      + "\"%s\" = true", tablename, USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP);
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      conn.createStatement().execute(ddl);
+      String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+      PreparedStatement ps = conn.prepareStatement(dml);
+      int totalRows = 10000;
+      for (int i = 0; i < totalRows; ++i) {
+        ps.setString(1, "id_" + i % 3);
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i);
+        ps.setInt(4, i % 10);
+        ps.setString(5, "val");
+        ps.executeUpdate();
+        if (i != 0 && i % 100 == 0) {
+          conn.commit();
+        }
+      }
+      conn.commit();
+      int rowKeyCount = 1000;
+      List<String> inList =
+        Stream.generate(() -> "(?, ?, 
?)").limit(rowKeyCount).collect(Collectors.toList());
+      String dql = String.format("select id, k1, k2 from %s where (id, k1, k2) 
IN (%s)", tablename,
+        String.join(",", inList));
+      ps = conn.prepareStatement(dql);
+      int expectedValidRows = 0;
+      for (int i = 0; i < rowKeyCount; i++) {
+        ps.setString(3 * i + 1, "id_" + i % 3);
+        if (RAND.nextBoolean()) {
+          ++expectedValidRows;
+          ps.setInt(3 * i + 2, i % 20);
+        } else {
+          // generate a non-existing row key
+          ps.setInt(3 * i + 2, 78123);
+        }
+        ps.setInt(3 * i + 3, i);
+      }
+      int actualValidRows = 0;
+      try (ResultSet rs = ps.executeQuery()) {
+        while (rs.next()) {
+          ++actualValidRows;
+        }
+        assertEquals(expectedValidRows, actualValidRows);
+        Map<String, Map<MetricType, Long>> metrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRows = getMetricValue(metrics, MetricType.COUNT_ROWS_SCANNED);
+        assertEquals(expectedValidRows, numRows);
+        long numRpcs = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        assertTrue(numRpcs > 1);
+      }
+    }
+  }
+
+  @Test
+  public void testPagingWithTTLMasking() throws Exception {
+    final String tablename = generateUniqueName();
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    // use a higher timeout value so that we can trigger a page timeout from 
the scanner
+    // rather than the page filter
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(10));
+    int ttl = 2; // 2 seconds
+    String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" + 
"k1 INTEGER NOT NULL,\n"
+      + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
+      + "CONSTRAINT pk PRIMARY KEY (id, k1, k2)) TTL=" + ttl;
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      conn.createStatement().execute(ddl);
+      String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+      PreparedStatement ps = conn.prepareStatement(dml);
+      int totalRows = 10000;
+      for (int i = 0; i < totalRows; ++i) {
+        ps.setString(1, "id_" + i % 3);
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i);
+        ps.setInt(4, i % 10);
+        ps.setString(5, "val");
+        ps.executeUpdate();
+        if (i != 0 && i % 100 == 0) {
+          conn.commit();
+        }
+      }
+      conn.commit();
+      // Sleep so that the rows expire
+      // Can't use EnvironmentEdgeManager because that messes up page timeout 
calculations
+      Thread.sleep(ttl * 1000 + 50);
+      String dql = String.format("SELECT count(*) from %s where id = '%s'", 
tablename, "id_2");
+      try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+        assertTrue(rs.next());
+        assertEquals(0, rs.getInt(1));
+        assertFalse(rs.next());
+        Map<String, Map<MetricType, Long>> metrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        // multiple scan rpcs will be executed for every page timeout
+        assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+      }
+      // Insert few more rows
+      int additionalRows = 5;
+      for (int i = 0; i < additionalRows; ++i) {
+        ps.setString(1, "id_2");
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i + totalRows);
+        ps.setInt(4, i % 10);
+        ps.setString(5, "val");
+        ps.executeUpdate();
+      }
+      conn.commit();
+      try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+        assertTrue(rs.next());
+        assertEquals(additionalRows, rs.getInt(1));
+        assertFalse(rs.next());
+        Map<String, Map<MetricType, Long>> metrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        // multiple scan rpcs will be executed for every page timeout
+        assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+      }
+    }
+  }
+
+  @Test
+  public void testPagingWithUnverifiedIndexRows() throws Exception {
+    final String tablename = generateUniqueName();
+    final String indexname = generateUniqueName();
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    // use a higher timeout value so that we can trigger a page timeout from 
the scanner
+    // rather than the page filter
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(5));
+    String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" + 
"k1 INTEGER NOT NULL,\n"
+      + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
+      + "CONSTRAINT pk PRIMARY KEY (id, k1, k2))";
+    String indexddl = "CREATE INDEX " + indexname + " ON " + tablename + "(k3) 
include(v1)";
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      conn.createStatement().execute(ddl);
+      conn.createStatement().execute(indexddl);
+      String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+      PreparedStatement ps = conn.prepareStatement(dml);
+      int totalRows = 10000;
+      for (int i = 0; i < totalRows; ++i) {
+        ps.setString(1, "id_" + i % 3);
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i);
+        ps.setInt(4, i % 10);
+        ps.setString(5, "val");
+        ps.executeUpdate();
+        if (i != 0 && i % 100 == 0) {
+          conn.commit();
+        }
+      }
+      conn.commit();
+      String dql = String.format("SELECT count(*) from %s where k3 = 5", 
tablename);
+      try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+        PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+        String explainPlan = 
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+        assertTrue(explainPlan.contains(indexname));
+        assertTrue(rs.next());
+        assertEquals(totalRows / 10, rs.getInt(1));
+        assertFalse(rs.next());
+        Map<String, Map<MetricType, Long>> metrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        // multiple scan rpcs will be executed for every page timeout
+        assertTrue(String.format("Got %d", numRpc), numRpc > 1);
+      }
+      // Insert few unverified index rows by failing phase 2
+      int additionalRows = 10;
+      for (int i = 0; i < additionalRows; ++i) {
+        ps.setString(1, "id_2");
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i + totalRows);
+        ps.setInt(4, 5); // set k3=5
+        ps.setString(5, "val");
+        ps.executeUpdate();
+      }
+      IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+      try {
+        commitWithException(conn);
+      } finally {
+        IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+      }
+      try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+        PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+        String explainPlan = 
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+        assertTrue(explainPlan.contains(indexname));
+        assertTrue(rs.next());
+        assertEquals(totalRows / 10, rs.getInt(1));
+        assertFalse(rs.next());
+        Map<String, Map<MetricType, Long>> metrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        // multiple scan rpcs will be executed for every page timeout
+        assertTrue(String.format("Got %d", numRpc), numRpc > 1);
       }
     }
   }
@@ -481,24 +692,46 @@ public class ServerPagingIT extends 
ParallelStatsDisabledIT {
     PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
     stmt.execute(
       "CREATE TABLE " + tableName + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z 
UNSIGNED_LONG)");
-    for (int i = 1; i <= 200; i++) {
+    final int rowCount = 200;
+    for (int i = 1; i <= rowCount; i++) {
       String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName, 
i, i);
       stmt.execute(sql);
     }
     conn.commit();
 
     // delete every alternate row
-    for (int i = 1; i <= 200; i = i + 2) {
+    for (int i = 1; i <= rowCount; i = i + 2) {
       stmt.execute("DELETE FROM " + tableName + " WHERE A = " + i);
       conn.commit();
     }
 
+    // full table scan
     ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
     while (rs.next()) {
     }
     Map<String, Map<MetricType, Long>> metrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
     long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
-    Assert.assertEquals(101, numRpc);
+    // with 0ms page timeout every row whether it is valid or a delete marker 
will generate a page
+    // timeout so the number of rpcs will be row count + 1
+    assertEquals(rowCount + 1, numRpc);
+
+    // aggregate query
+    rs = stmt.executeQuery("SELECT count(*) FROM " + tableName);
+    assertTrue(rs.next());
+    assertEquals(rowCount / 2, rs.getInt(1));
+    assertFalse(rs.next());
+    metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+    numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+    assertEquals(rowCount + 1, numRpc);
+
+    // aggregate query with a filter
+    rs = stmt.executeQuery("SELECT count(*) FROM " + tableName + " where Z % 4 
= 0");
+    assertTrue(rs.next());
+    assertEquals(rowCount / 4, rs.getInt(1));
+    assertFalse(rs.next());
+    metrics = PhoenixRuntime.getRequestReadMetricInfo(rs);
+    numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+    assertEquals(rowCount + 1, numRpc);
   }
 
   @Test
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index c13e9c4091..ee9acb4aef 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -1661,7 +1661,7 @@ public class GlobalIndexCheckerIT extends BaseTest {
     }
   }
 
-  static private void commitWithException(Connection conn) {
+  public static void commitWithException(Connection conn) {
     try {
       conn.commit();
       IndexRegionObserver.setFailPreIndexUpdatesForTesting(false);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
index d8ca7e0c79..9758dd51f9 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.monitoring;
 
 import static 
org.apache.phoenix.query.QueryServices.USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -323,6 +324,37 @@ public class CountRowsScannedIT extends BaseTest {
     assertEquals(142, count4);
   }
 
+  @Test
+  public void testLimitOffsetWithoutSplit() throws Exception {
+    final String tablename = generateUniqueName();
+    final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", 
"j", "k", "l", "m", "n",
+      "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+    String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n"
+      + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n" + "C3.k3 
INTEGER,\n"
+      + "C2.v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2))";
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTestTable(getUrl(), ddl);
+      for (int i = 0; i < 26; i++) {
+        conn.createStatement().execute("UPSERT INTO " + tablename + " 
values('" + STRINGS[i] + "',"
+          + i + "," + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')");
+      }
+      conn.commit();
+      int limit = 12;
+      int offset = 5;
+      ResultSet rs;
+      rs = conn.createStatement().executeQuery(
+        "SELECT t_id from " + tablename + " order by t_id limit " + limit + " 
offset " + offset);
+      int i = 0;
+      while (i < limit) {
+        assertTrue(rs.next());
+        assertEquals("Expected string didn't match for i = " + i, 
STRINGS[offset + i],
+          rs.getString(1));
+        i++;
+      }
+      assertEquals(limit + offset, getRowsScanned(rs));
+    }
+  }
+
   private long countRowsScannedFromSql(Statement stmt, String sql) throws 
SQLException {
     ResultSet rs = stmt.executeQuery(sql);
     while (rs.next()) {

Reply via email to