This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.2 by this push:
     new bd0750f461 PHOENIX-5117 : Return the count of rows scanned in HBase 
(#1936) (#2088)
bd0750f461 is described below

commit bd0750f461dfa123c91cee412b21eab83e9403b1
Author: Palash Chauhan <palashc...@gmail.com>
AuthorDate: Sat Mar 15 10:15:19 2025 -0700

    PHOENIX-5117 : Return the count of rows scanned in HBase (#1936) (#2088)
---
 .../org/apache/phoenix/compile/QueryCompiler.java  |  29 +-
 .../apache/phoenix/compile/StatementContext.java   |  12 +
 .../org/apache/phoenix/execute/HashJoinPlan.java   |   3 +
 .../phoenix/iterate/MergeSortResultIterator.java   |   5 +-
 .../phoenix/iterate/OffsetResultIterator.java      |   1 -
 .../phoenix/iterate/OrderedResultIterator.java     |   3 +-
 .../org/apache/phoenix/jdbc/PhoenixResultSet.java  |   9 +-
 .../apache/phoenix/optimize/QueryOptimizer.java    |   3 +-
 .../hbase/regionserver/ScannerContextUtil.java     |  26 +-
 .../cache/aggcache/SpillableGroupByCache.java      |   6 +
 .../phoenix/coprocessor/BaseRegionScanner.java     |   7 +-
 .../coprocessor/BaseScannerRegionObserver.java     |   4 +-
 .../phoenix/coprocessor/DelegateRegionScanner.java |  32 +-
 .../GroupedAggregateRegionObserver.java            |  49 ++-
 .../phoenix/coprocessor/HashJoinRegionScanner.java |  28 +-
 .../coprocessor/IndexRebuildRegionScanner.java     |   5 +
 .../coprocessor/IndexRepairRegionScanner.java      |   5 +
 .../phoenix/coprocessor/IndexerRegionScanner.java  |   5 +
 .../phoenix/coprocessor/PagingRegionScanner.java   |  55 +++-
 .../coprocessor/PhoenixTTLRegionObserver.java      |   6 +-
 .../phoenix/coprocessor/TTLRegionScanner.java      |  38 ++-
 .../coprocessor/UncoveredIndexRegionScanner.java   |   5 +
 .../UncoveredLocalIndexRegionScanner.java          |   5 +
 .../UngroupedAggregateRegionObserver.java          |  24 ++
 .../UngroupedAggregateRegionScanner.java           |  17 +-
 .../apache/phoenix/index/GlobalIndexChecker.java   |  19 +-
 .../iterate/NonAggregateRegionScannerFactory.java  |  92 +++++-
 .../phoenix/iterate/RegionScannerFactory.java      |  33 +-
 .../iterate/RegionScannerResultIterator.java       |  23 +-
 .../org/apache/phoenix/end2end/ServerPagingIT.java |  40 +++
 .../phoenix/monitoring/CountRowsScannedIT.java     | 332 +++++++++++++++++++++
 .../hbase/index/covered/NonTxIndexBuilderTest.java |   5 +
 32 files changed, 826 insertions(+), 100 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index eb60562c32..e2be77cea9 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -116,20 +116,29 @@ public class QueryCompiler {
     private final boolean optimizeSubquery;
     private final Map<TableRef, QueryPlan> dataPlans;
     private final boolean costBased;
+    private final StatementContext parentContext;
 
     public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, boolean projectTuples, boolean optimizeSubquery, 
Map<TableRef, QueryPlan> dataPlans) throws SQLException {
         this(statement, select, resolver, Collections.<PDatum>emptyList(), 
null, new SequenceManager(statement), projectTuples, optimizeSubquery, 
dataPlans);
     }
 
     public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, BindManager bindManager, boolean projectTuples, 
boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans) throws 
SQLException {
-        this(statement, select, resolver, bindManager, 
Collections.<PDatum>emptyList(), null, new SequenceManager(statement), 
projectTuples, optimizeSubquery, dataPlans);
+        this(statement, select, resolver, bindManager, 
Collections.<PDatum>emptyList(), null, new SequenceManager(statement), 
projectTuples, optimizeSubquery, dataPlans, null);
     }
 
     public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, List<? extends PDatum> targetColumns, 
ParallelIteratorFactory parallelIteratorFactory, SequenceManager 
sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, 
QueryPlan> dataPlans) throws SQLException {
-        this(statement, select, resolver, new 
BindManager(statement.getParameters()), targetColumns, parallelIteratorFactory, 
sequenceManager, projectTuples, optimizeSubquery, dataPlans);
+        this(statement, select, resolver, new 
BindManager(statement.getParameters()), targetColumns, parallelIteratorFactory, 
sequenceManager, projectTuples, optimizeSubquery, dataPlans, null);
     }
 
     public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, BindManager bindManager, List<? extends PDatum> 
targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager 
sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, 
QueryPlan> dataPlans) throws SQLException {
+        this(statement, select, resolver, bindManager, targetColumns, 
parallelIteratorFactory, sequenceManager, projectTuples, optimizeSubquery, 
dataPlans, null);
+    }
+
+    public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, List<? extends PDatum> targetColumns, 
ParallelIteratorFactory parallelIteratorFactory, SequenceManager 
sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, 
QueryPlan> dataPlans, StatementContext parentContext) throws SQLException {
+        this(statement, select, resolver, new 
BindManager(statement.getParameters()), targetColumns, parallelIteratorFactory, 
sequenceManager, projectTuples, optimizeSubquery, dataPlans, parentContext);
+    }
+
+    public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, BindManager bindManager, List<? extends PDatum> 
targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager 
sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, 
QueryPlan> dataPlans, StatementContext parentContext) throws SQLException {
         this.statement = statement;
         this.select = select;
         this.resolver = resolver;
@@ -152,6 +161,7 @@ public class QueryCompiler {
         this.originalScan = ScanUtil.newScan(scan);
         this.optimizeSubquery = optimizeSubquery;
         this.dataPlans = dataPlans == null ? Collections.<TableRef, 
QueryPlan>emptyMap() : dataPlans;
+        this.parentContext = parentContext;
     }
 
     public QueryCompiler(PhoenixStatement statement, SelectStatement select, 
ColumnResolver resolver, List<? extends PDatum> targetColumns, 
ParallelIteratorFactory parallelIteratorFactory, SequenceManager 
sequenceManager) throws SQLException {
@@ -239,6 +249,9 @@ public class QueryCompiler {
 
     public QueryPlan compileSelect(SelectStatement select) throws SQLException{
         StatementContext context = createStatementContext();
+        if (parentContext != null) {
+            parentContext.addSubStatementContext(context);
+        }
         if (select.isJoin()) {
             JoinTable joinTable = JoinCompiler.compile(statement, select, 
context.getResolver());
             return compileJoinQuery(context, joinTable, false, false, null);
@@ -637,9 +650,13 @@ public class QueryCompiler {
         return type == JoinType.Semi && complete;
     }
 
+    protected QueryPlan compileSubquery(SelectStatement subquery, boolean 
pushDownMaxRows) throws SQLException {
+        return compileSubquery(subquery, pushDownMaxRows, null);
+    }
+
     protected QueryPlan compileSubquery(
             SelectStatement subquerySelectStatement,
-            boolean pushDownMaxRows) throws SQLException {
+            boolean pushDownMaxRows, StatementContext parentContext) throws 
SQLException {
         PhoenixConnection phoenixConnection = this.statement.getConnection();
         RewriteResult rewriteResult =
                 ParseNodeUtil.rewrite(subquerySelectStatement, 
phoenixConnection);
@@ -658,6 +675,9 @@ public class QueryCompiler {
                     statement,
                     queryPlan);
         }
+        if (parentContext != null) {
+            parentContext.addSubStatementContext(queryPlan.getContext());
+        }
         this.statement.setMaxRows(maxRows); // restore maxRows.
         return queryPlan;
     }
@@ -673,14 +693,13 @@ public class QueryCompiler {
             throw new SQLException("RVC Offset not allowed with subqueries.");
         }
 
-        QueryPlan innerPlan = compileSubquery(innerSelect, false);
+        QueryPlan innerPlan = compileSubquery(innerSelect, false, context);
         if (innerPlan instanceof UnionPlan) {
             UnionCompiler.optimizeUnionOrderByIfPossible(
                     (UnionPlan) innerPlan,
                     select,
                     this::createStatementContext);
         }
-
         RowProjector innerQueryPlanRowProjector = innerPlan.getProjector();
         TupleProjector tupleProjector = new 
TupleProjector(innerQueryPlanRowProjector);
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
index a7abbd5dd5..d35d1b1530 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java
@@ -47,6 +47,7 @@ import org.apache.phoenix.schema.types.PDate;
 import org.apache.phoenix.schema.types.PTime;
 import org.apache.phoenix.schema.types.PTimestamp;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
 import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.NumberUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
@@ -86,6 +87,7 @@ public class StatementContext {
     private boolean isClientSideUpsertSelect;
     private boolean isUncoveredIndex;
     private AtomicBoolean hasFirstValidResult;
+    private Set<StatementContext> subStatementContexts;
     
     public StatementContext(PhoenixStatement statement) {
         this(statement, new Scan());
@@ -114,6 +116,7 @@ public class StatementContext {
         this.isClientSideUpsertSelect = context.isClientSideUpsertSelect;
         this.isUncoveredIndex = context.isUncoveredIndex;
         this.hasFirstValidResult = new 
AtomicBoolean(context.getHasFirstValidResult());
+        this.subStatementContexts = Sets.newHashSet();
     }
     /**
      *  Constructor that lets you override whether or not to collect request 
level metrics.
@@ -159,6 +162,7 @@ public class StatementContext {
         this.overAllQueryMetrics = new 
OverAllQueryMetrics(isRequestMetricsEnabled,connection.getLogLevel());
         this.retryingPersistentCache = Maps.<Long, Boolean> newHashMap();
         this.hasFirstValidResult = new AtomicBoolean(false);
+        this.subStatementContexts = Sets.newHashSet();
     }
 
     /**
@@ -390,4 +394,12 @@ public class StatementContext {
             return retrying;
         }
     }
+
+    public void addSubStatementContext(StatementContext sub) {
+        subStatementContexts.add(sub);
+    }
+
+    public Set<StatementContext> getSubStatementContexts() {
+        return subStatementContexts;
+    }
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index fbe5edd161..259d718acb 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -153,6 +153,9 @@ public class HashJoinPlan extends DelegateQueryPlan {
                 QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
         this.serverCacheLimit = services.getProps().getLongBytes(
                 QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE);
+        for (SubPlan subPlan: subPlans) {
+            
this.getContext().addSubStatementContext(subPlan.getInnerPlan().getContext());
+        }
     }
     
     @Override
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
index d094bec568..8392ab6f48 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
@@ -56,7 +56,10 @@ public abstract class MergeSortResultIterator implements 
PeekingResultIterator {
     @Override
     public Tuple next() throws SQLException {
         MaterializedComparableResultIterator iterator = minIterator();
-        if (iterator == null) { return null; }
+        if (iterator == null) {
+            close();
+            return null;
+        }
         Tuple next = iterator.next();
         minHeap.poll();
         if (iterator.peek() != null) {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
index af23429cca..f16bb84948 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
@@ -22,7 +22,6 @@ import java.util.List;
 
 import static org.apache.phoenix.util.ScanUtil.getDummyTuple;
 import static org.apache.phoenix.util.ScanUtil.isDummy;
-
 import 
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 4e36ae1a2a..3d6713b3b8 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -217,7 +217,8 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
 
     public OrderedResultIterator(ResultIterator delegate,
             List<OrderByExpression> orderByExpressions, boolean 
spoolingEnabled,
-            long thresholdBytes, Integer limit, Integer offset, int 
estimatedRowSize, long pageSizeMs) {
+            long thresholdBytes, Integer limit, Integer offset,
+             int estimatedRowSize, long pageSizeMs) {
         checkArgument(!orderByExpressions.isEmpty());
         this.delegate = delegate;
         this.orderByExpressions = orderByExpressions;
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
index 4d5f038e0a..fb2fcb3435 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java
@@ -1480,7 +1480,14 @@ public class PhoenixResultSet implements 
PhoenixMonitoredResultSet, SQLCloseable
 
     @Override
     public Map<String, Map<MetricType, Long>> getReadMetrics() {
-        return readMetricsQueue.aggregate();
+        ReadMetricQueue one = readMetricsQueue;
+        if (context != null) {
+            for (StatementContext sub : context.getSubStatementContexts()) {
+                ReadMetricQueue subMetric = sub.getReadMetricsQueue();
+                one.combineReadMetrics(subMetric);
+            }
+        }
+        return one.aggregate();
     }
 
     @Override
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
index faec49322b..42cf622824 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java
@@ -195,7 +195,8 @@ public class QueryOptimizer {
                 dataPlan.getContext().getSequenceManager(),
                 true,
                 true,
-                dataPlans);
+                dataPlans,
+                dataPlan.getContext());
         return Collections.singletonList(compiler.compile());
     }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
 
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
index 3103457325..7bdb2c0b8b 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
@@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.regionserver;
 
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 
 /**
@@ -36,4 +36,28 @@ public class ScannerContextUtil {
                     cell.heapSize());
         }
     }
+
+    public static void updateMetrics(ScannerContext src, ScannerContext dst) {
+        if (src != null && dst != null && src.isTrackingMetrics() && 
dst.isTrackingMetrics()) {
+            for (Map.Entry<String, Long> entry : 
src.getMetrics().getMetricsMap().entrySet()) {
+                dst.metrics.addToCounter(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    public static ScannerContext copyNoLimitScanner(ScannerContext sc) {
+        return new ScannerContext(sc.keepProgress, null, 
sc.isTrackingMetrics());
+    }
+
+    public static void updateTimeProgress(ScannerContext sc) {
+        sc.updateTimeProgress();
+    }
+
+    /**
+     * Set returnImmediately on the ScannerContext to true, it will have the 
same behavior
+     * as reaching the time limit. Use this to make RSRpcService.scan return 
immediately.
+     */
+    public static void setReturnImmediately(ScannerContext sc) {
+        sc.returnImmediately();
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
index a753365692..f677b081f2 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
@@ -365,6 +366,11 @@ public class SpillableGroupByCache implements GroupByCache 
{
                 }
             }
 
+            public boolean next(List<Cell> result, ScannerContext 
scannerContext)
+                    throws IOException {
+                return next(result);
+            }
+
             @Override
             public boolean next(List<Cell> results) throws IOException {
                 if (!cacheIter.hasNext()) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
index 8533bb6391..23a9f075c4 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java
@@ -42,9 +42,8 @@ public abstract class BaseRegionScanner extends 
DelegateRegionScanner {
     public abstract boolean next(List<Cell> results) throws IOException;
 
     @Override
-    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        throw new IOException("Next with scannerContext should not be called 
in Phoenix environment");
-    }
+    public abstract boolean next(List<Cell> result, ScannerContext 
scannerContext)
+            throws IOException;
 
     @Override
     public boolean reseek(byte[] row) throws IOException {
@@ -58,6 +57,6 @@ public abstract class BaseRegionScanner extends 
DelegateRegionScanner {
 
     @Override
     public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        throw new IOException("NextRaw with scannerContext should not be 
called in Phoenix environment");
+        return next(result, scannerContext);
     }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 52062aa3de..d893f7f455 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -237,7 +237,7 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
             @Override
             public boolean next(List<Cell> result, ScannerContext 
scannerContext) throws IOException {
                 overrideDelegate();
-                boolean res = super.next(result);
+                boolean res = super.next(result, scannerContext);
                 ScannerContextUtil.incrementSizeProgress(scannerContext, 
result);
                 return res;
             }
@@ -251,7 +251,7 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
             @Override
             public boolean nextRaw(List<Cell> result, ScannerContext 
scannerContext) throws IOException {
                 overrideDelegate();
-                boolean res = super.nextRaw(result);
+                boolean res = super.nextRaw(result, scannerContext);
                 ScannerContextUtil.incrementSizeProgress(scannerContext, 
result);
                 return res;
             }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index bbd51f3396..3d74243191 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
+
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 public class DelegateRegionScanner implements RegionScanner {
 
@@ -61,22 +64,22 @@ public class DelegateRegionScanner implements RegionScanner 
{
 
     @Override
     public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        throw new IOException("Next with scannerContext should not be called 
in Phoenix environment");
+        return next(result, false, scannerContext);
     }
 
     @Override
     public boolean next(List<Cell> result) throws IOException {
-        return delegate.next(result);
+        return next(result, false, null);
     }
 
     @Override
     public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        throw new IOException("NextRaw with scannerContext should not be 
called in Phoenix environment");
+        return next(result, true, scannerContext);
     }
 
     @Override
-    public boolean nextRaw(List<Cell> arg0) throws IOException {
-        return delegate.nextRaw(arg0);
+    public boolean nextRaw(List<Cell> result) throws IOException {
+        return next(result, true, null);
     }
 
     @Override
@@ -97,4 +100,23 @@ public class DelegateRegionScanner implements RegionScanner 
{
             throw new DoNotRetryIOException(e);
         }
     }
+
+    private boolean next(List<Cell> result, boolean raw, ScannerContext 
scannerContext)
+            throws IOException {
+        if (scannerContext != null) {
+            ScannerContext noLimitContext = ScannerContextUtil
+                    .copyNoLimitScanner(scannerContext);
+            boolean hasMore = raw
+                    ? delegate.nextRaw(result, noLimitContext)
+                    : delegate.next(result, noLimitContext);
+            if (isDummy(result)) {
+                // when a dummy row is returned by a lower layer, set 
returnImmediately
+                // on the ScannerContext to force HBase to return a response 
to the client
+                ScannerContextUtil.setReturnImmediately(scannerContext);
+            }
+            ScannerContextUtil.updateMetrics(noLimitContext, scannerContext);
+            return hasMore;
+        }
+        return raw ? delegate.nextRaw(result) : delegate.next(result);
+    }
 }
\ No newline at end of file
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 29e8c0a147..f36412f03f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
@@ -352,6 +353,11 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
             return new BaseRegionScanner(s) {
                 private int index = 0;
 
+                public boolean next(List<Cell> result, ScannerContext 
scannerContext)
+                        throws IOException {
+                    return next(result);
+                }
+
                 @Override
                 public void close() throws IOException {
                     try {
@@ -482,8 +488,20 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
             }
         }
 
+        @Override
+        public boolean nextRaw(List<Cell> results, ScannerContext 
scannerContext)
+                throws IOException {
+            return next(results, scannerContext);
+        }
+
         @Override
         public boolean next(List<Cell> resultsToReturn) throws IOException {
+            return next(resultsToReturn, null);
+        }
+
+        @Override
+        public boolean next(List<Cell> resultsToReturn, ScannerContext 
scannerContext)
+                throws IOException {
             if (firstScan && actualScanStartRowKey != null) {
                 if (scanStartRowKey.length > 0 && 
!ScanUtil.isLocalIndex(scan)) {
                     if (hasRegionMoved()) {
@@ -522,7 +540,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
             if (firstScan) {
                 firstScan = false;
             }
-            boolean moreRows = nextInternal(resultsToReturn);
+            boolean moreRows = nextInternal(resultsToReturn, scannerContext);
             if (ScanUtil.isDummy(resultsToReturn)) {
                 return true;
             }
@@ -560,7 +578,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
                         // If includeStartRowKey is false and the current 
rowkey is matching
                         // with scanStartRowKey, return the next row result.
                         resultsToReturn.clear();
-                        moreRows = nextInternal(resultsToReturn);
+                        moreRows = nextInternal(resultsToReturn, 
scannerContext);
                         if (ScanUtil.isDummy(resultsToReturn)) {
                             return true;
                         }
@@ -578,7 +596,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
                             // If includeStartRowKey is true and the (current 
rowkey + "\0xx") is
                             // matching with scanStartRowKey, return the next 
row result.
                             resultsToReturn.clear();
-                            moreRows = nextInternal(resultsToReturn);
+                            moreRows = nextInternal(resultsToReturn, 
scannerContext);
                             if (ScanUtil.isDummy(resultsToReturn)) {
                                 return true;
                             }
@@ -590,7 +608,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
                     }
                     // In the loop, keep iterating through rows.
                     resultsToReturn.clear();
-                    moreRows = nextInternal(resultsToReturn);
+                    moreRows = nextInternal(resultsToReturn, scannerContext);
                     if (ScanUtil.isDummy(resultsToReturn)) {
                         return true;
                     }
@@ -609,7 +627,8 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
          * @return true if more rows exist after this one, false if scanner is 
done.
          * @throws IOException if something goes wrong.
          */
-        private boolean nextInternal(List<Cell> resultsToReturn) throws 
IOException {
+        private boolean nextInternal(List<Cell> resultsToReturn, 
ScannerContext scannerContext)
+                throws IOException {
             boolean hasMore;
             long startTime = EnvironmentEdgeManager.currentTimeMillis();
             long now;
@@ -620,7 +639,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
                 acquiredLock = true;
                 synchronized (delegate) {
                     if (regionScanner != null) {
-                        return regionScanner.next(resultsToReturn);
+                        return regionScanner.next(resultsToReturn, 
scannerContext);
                     }
                     do {
                         List<Cell> results = useQualifierAsIndex ?
@@ -632,7 +651,9 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
                         // since this is an indication of whether or not there 
are
                         // more values after the
                         // ones returned
-                        hasMore = delegate.nextRaw(results);
+                        hasMore = (scannerContext == null)
+                                    ? delegate.nextRaw(results)
+                                    : delegate.nextRaw(results, 
scannerContext);
                         if (!results.isEmpty()) {
                             if (isDummy(results)) {
                                 return getDummyResult(resultsToReturn);
@@ -779,8 +800,18 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
             }
         }
 
+        @Override
+        public boolean nextRaw(List<Cell> results, ScannerContext 
scannerContext)
+                throws IOException {
+            return next(results, scannerContext);
+        }
+
         @Override
         public boolean next(List<Cell> results) throws IOException {
+            return next(results, null);
+        }
+        @Override
+        public boolean next(List<Cell> results, ScannerContext scannerContext) 
throws IOException {
             boolean hasMore;
             boolean atLimit;
             boolean aggBoundary = false;
@@ -807,7 +838,9 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver im
                         // since this is an indication of whether or not there
                         // are more values after the
                         // ones returned
-                        hasMore = delegate.nextRaw(kvs);
+                        hasMore = (scannerContext == null)
+                                    ? delegate.nextRaw(kvs)
+                                    : delegate.nextRaw(kvs, scannerContext);
                         if (!kvs.isEmpty()) {
                             if (isDummy(kvs)) {
                                 updateDummyWithPrevRowKey(results, 
initStartRowKey,
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 487eb86aba..6a2b3459da 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -297,10 +297,26 @@ public class HashJoinRegionScanner implements 
RegionScanner {
 
     @Override
     public boolean nextRaw(List<Cell> result) throws IOException {
+        return next(result, true, null);
+    }
+
+    @Override
+    public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result, true, scannerContext);
+    }
+
+
+    private boolean next(List<Cell> result,  boolean raw, ScannerContext 
scannerContext) throws IOException {
         try {
             long startTime = EnvironmentEdgeManager.currentTimeMillis();
             while (shouldAdvance()) {
-                hasMore = scanner.nextRaw(result);
+                if (scannerContext != null) {
+                    hasMore = raw
+                            ? scanner.nextRaw(result, scannerContext)
+                            : scanner.next(result, scannerContext);
+                } else {
+                    hasMore = raw ? scanner.nextRaw(result) : 
scanner.next(result);
+                }
                 if (isDummy(result)) {
                     return true;
                 }
@@ -325,12 +341,6 @@ public class HashJoinRegionScanner implements 
RegionScanner {
         }
     }
 
-    @Override
-    public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
-            throws IOException {
-        throw new IOException("Next with scannerContext should not be called 
in Phoenix environment");
-    }
-
     @Override
     public boolean reseek(byte[] row) throws IOException {
         return scanner.reseek(row);
@@ -343,12 +353,12 @@ public class HashJoinRegionScanner implements 
RegionScanner {
 
     @Override
     public boolean next(List<Cell> result) throws IOException {
-        throw new IOException("Next should not be used in HashJoin scanner");
+        return next(result, false, null);
     }
 
     @Override
     public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        throw new IOException("Next with scannerContext should not be called 
in Phoenix environment");
+        return next(result, false, scannerContext);
     }
 
     @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 9b7402149f..960a1a8cfb 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -34,6 +34,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.Cell;
@@ -429,4 +430,8 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
         results.add(aggKeyValue);
         return hasMore || hasMoreIncr;
     }
+
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result);
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index 19707bd533..bc87704e29 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -34,6 +34,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -479,4 +480,8 @@ public class IndexRepairRegionScanner extends 
GlobalIndexRegionScanner {
         results.add(aggKeyValue);
         return hasMore || hasMoreIncr;
     }
+
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result);
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index 42e51d6606..0bb4249581 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
@@ -477,4 +478,8 @@ public class IndexerRegionScanner extends 
GlobalIndexRegionScanner {
     public long getMaxResultSize() {
         return scan.getMaxResultSize();
     }
+
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result);
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
index 719ce43731..66aa9598a8 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.filter.PagingFilter;
 import org.apache.phoenix.filter.SkipScanFilter;
@@ -130,12 +131,20 @@ public class PagingRegionScanner extends 
BaseRegionScanner {
         private boolean hasMore() {
             return lookupPosition < pointLookupRanges.size();
         }
-        private boolean next(List<Cell> results, boolean raw, RegionScanner 
scanner)
-                throws IOException {
+        private boolean next(List<Cell> results, boolean raw, RegionScanner 
scanner,
+                             ScannerContext scannerContext) throws IOException 
{
             try {
                 long startTime = EnvironmentEdgeManager.currentTimeMillis();
                 while (true) {
-                    if (raw ? scanner.nextRaw(results) : 
scanner.next(results)) {
+                    boolean hasMore;
+                    if (scannerContext != null) {
+                        hasMore = raw
+                                ? scanner.nextRaw(results, scannerContext)
+                                : scanner.next(results, scannerContext);
+                    } else {
+                        hasMore = raw ? scanner.nextRaw(results) : 
scanner.next(results);
+                    }
+                    if (hasMore) {
                         // Since each scan is supposed to return only one row 
(even when the
                         // start and stop row key are not the same, which 
happens after region
                         // moves or when there are delete markers in the 
table), this should not
@@ -197,7 +206,8 @@ public class PagingRegionScanner extends BaseRegionScanner {
         initialized = true;
     }
 
-    private boolean next(List<Cell> results, boolean raw) throws IOException {
+    private boolean next(List<Cell> results, boolean raw, ScannerContext 
scannerContext)
+            throws IOException {
         init();
         if (pagingFilter != null) {
             pagingFilter.init();
@@ -230,19 +240,26 @@ public class PagingRegionScanner extends 
BaseRegionScanner {
 
         } else {
             if (multiKeyPointLookup != null) {
-               RegionScanner regionScanner = 
multiKeyPointLookup.getNewScanner();
-               if (regionScanner == null) {
-                   return false;
-               }
-               delegate.close();
-               delegate = regionScanner;
+                RegionScanner regionScanner = 
multiKeyPointLookup.getNewScanner();
+                if (regionScanner == null) {
+                    return false;
+                }
+                delegate.close();
+                delegate = regionScanner;
             }
         }
 
         if (multiKeyPointLookup != null) {
-            return multiKeyPointLookup.next(results, raw, delegate);
+            return multiKeyPointLookup.next(results, raw, delegate, 
scannerContext);
+        }
+        boolean hasMore;
+        if (scannerContext != null) {
+            hasMore = raw
+                    ? delegate.nextRaw(results, scannerContext)
+                    : delegate.next(results, scannerContext);
+        } else {
+            hasMore = raw ? delegate.nextRaw(results) : delegate.next(results);
         }
-        boolean hasMore = raw ? delegate.nextRaw(results) : 
delegate.next(results);
         if (pagingFilter == null) {
             return hasMore;
         }
@@ -268,12 +285,22 @@ public class PagingRegionScanner extends 
BaseRegionScanner {
 
     @Override
     public boolean next(List<Cell> results) throws IOException {
-        return next(results, false);
+        return next(results, false, null);
     }
 
     @Override
     public boolean nextRaw(List<Cell> results) throws IOException {
-        return next(results, true);
+        return next(results, true, null);
+    }
+
+    @Override
+    public boolean next(List<Cell> results, ScannerContext scannerContext) 
throws IOException {
+        return next(results, false, scannerContext);
+    }
+
+    @Override
+    public boolean nextRaw(List<Cell> results, ScannerContext scannerContext) 
throws IOException {
+        return next(results, true, scannerContext);
     }
 
     @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
index cc5d416513..810d965120 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
@@ -176,14 +176,12 @@ public class PhoenixTTLRegionObserver extends 
BaseScannerRegionObserver implemen
 
         @Override public boolean next(List<Cell> result, ScannerContext 
scannerContext)
                 throws IOException {
-            throw new IOException(
-                    "next with scannerContext should not be called in Phoenix 
environment");
+            return next(result);
         }
 
         @Override public boolean nextRaw(List<Cell> result, ScannerContext 
scannerContext)
                 throws IOException {
-            throw new IOException(
-                    "NextRaw with scannerContext should not be called in 
Phoenix environment");
+            return nextRaw(result);
         }
 
         @Override public void close() throws IOException {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 57fcc60c44..6a7c5c58fa 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -210,15 +211,32 @@ public class TTLRegionScanner extends BaseRegionScanner {
         return false;
     }
 
-    private boolean next(List<Cell> result, boolean raw) throws IOException {
+    private boolean next(List<Cell> result, boolean raw, ScannerContext 
scannerContext)
+            throws IOException {
+        boolean hasMore;
         if (!isMaskingEnabled) {
-            return raw ? delegate.nextRaw(result) : delegate.next(result);
+            if (scannerContext != null) {
+                hasMore = raw
+                        ? delegate.nextRaw(result, scannerContext)
+                        : delegate.next(result, scannerContext);
+            } else {
+                hasMore = raw ? delegate.nextRaw(result) : 
delegate.next(result);
+            }
+            return hasMore;
         }
         if (!initialized) {
             init();
             initialized = true;
         }
-        boolean hasMore = raw ? delegate.nextRaw(result) : 
delegate.next(result);
+
+        if (scannerContext != null) {
+            hasMore = raw
+                    ? delegate.nextRaw(result, scannerContext)
+                    : delegate.next(result, scannerContext);
+        } else {
+            hasMore = raw ? delegate.nextRaw(result) : delegate.next(result);
+        }
+
         if (result.isEmpty() || ScanUtil.isDummy(result)) {
             return hasMore;
         }
@@ -235,12 +253,22 @@ public class TTLRegionScanner extends BaseRegionScanner {
 
     @Override
     public boolean next(List<Cell> results) throws IOException {
-        return next(results, false);
+        return next(results, false, null);
     }
 
     @Override
     public boolean nextRaw(List<Cell> results) throws IOException {
-        return next(results, true);
+        return next(results, true, null);
+    }
+
+    @Override
+    public boolean next(List<Cell> results, ScannerContext scannerContext) 
throws IOException {
+        return next(results, false, scannerContext);
+    }
+
+    @Override
+    public boolean nextRaw(List<Cell> results, ScannerContext scannerContext) 
throws IOException {
+        return next(results, true, scannerContext);
     }
 
     @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index ec61489af7..691b9265ba 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.execute.TupleProjector;
@@ -359,6 +360,10 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
         }
     }
 
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result);
+    }
+
     /**
      * A page of index rows are scanned and then their corresponding data 
table rows are retrieved
      * from the data table regions in parallel. These data rows are then 
joined with index rows.
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index a69b5a2e89..39b8733a0e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -25,6 +25,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -129,4 +130,8 @@ public class UncoveredLocalIndexRegionScanner extends 
UncoveredIndexRegionScanne
         ServerIndexUtil.wrapResultUsingOffset(result, offset);
         return hasMore;
     }
+
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result);
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b0b1d4196d..986c7209ab 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.regionserver.Store;
 import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -750,6 +752,13 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
 
     private RegionScanner collectStats(final RegionScanner innerScanner, 
StatisticsCollector stats,
                                        final Region region, final Scan scan, 
Configuration config) throws IOException {
+        ScannerContext groupScannerContext;
+        if (scan.isScanMetricsEnabled()) {
+            groupScannerContext = ScannerContext.newBuilder()
+                    .setTrackMetrics(scan.isScanMetricsEnabled()).build();
+        } else {
+            groupScannerContext = null;
+        }
         StatsCollectionCallable callable =
                 new StatsCollectionCallable(stats, region, innerScanner, 
config, scan);
         byte[] asyncBytes = 
scan.getAttribute(BaseScannerRegionObserverConstants.RUN_UPDATE_STATS_ASYNC_ATTRIB);
@@ -799,6 +808,21 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 }
             }
 
+            @Override
+            public boolean next(List<Cell> results, ScannerContext 
scannerContext)
+                    throws IOException {
+                if (groupScannerContext != null && scannerContext != null) {
+                    ScannerContextUtil.updateMetrics(groupScannerContext, 
scannerContext);
+                }
+                return next(results);
+            }
+
+            @Override
+            public boolean nextRaw(List<Cell> results, ScannerContext 
scannerContext)
+                    throws IOException {
+                return next(results, scannerContext);
+            }
+
             @Override
             public boolean next(List<Cell> results) throws IOException {
                 results.add(aggKeyValue);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 65f76c78e7..dc574c015e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -63,6 +63,7 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.GlobalCache;
@@ -560,8 +561,20 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
         }
     }
 
+    @Override
+    public boolean nextRaw(List<Cell> results, ScannerContext scannerContext)
+            throws IOException {
+        return next(results, scannerContext);
+    }
+
     @Override
     public boolean next(List<Cell> resultsToReturn) throws IOException {
+        return next(resultsToReturn, null);
+    }
+
+    @Override
+    public boolean next(List<Cell> resultsToReturn, ScannerContext 
scannerContext)
+            throws IOException {
         boolean hasMore;
         long startTime = EnvironmentEdgeManager.currentTimeMillis();
         Configuration conf = env.getConfiguration();
@@ -589,7 +602,9 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
                         // Results are potentially returned even when the 
return value of s.next is false
                         // since this is an indication of whether or not there 
are more values after the
                         // ones returned
-                        hasMore = innerScanner.nextRaw(results);
+                        hasMore = (scannerContext == null)
+                            ? innerScanner.nextRaw(results)
+                            : innerScanner.nextRaw(results, scannerContext);
                         if (isDummy(results)) {
                             if (!hasAny) {
                                 resultsToReturn.addAll(results);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 4dbf5c1f6d..62b8677103 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -252,7 +252,8 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
             return true;
         }
 
-        public boolean next(List<Cell> result, boolean raw) throws IOException 
{
+        public boolean next(List<Cell> result, boolean raw, ScannerContext 
scannerContext)
+                throws IOException {
             try {
                 if (!initialized) {
                     init();
@@ -261,9 +262,13 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
                 long startTime = EnvironmentEdgeManager.currentTimeMillis();
                 do {
                     if (raw) {
-                        hasMore = scanner.nextRaw(result);
+                        hasMore = (scannerContext == null)
+                                    ? scanner.nextRaw(result)
+                                    : scanner.nextRaw(result, scannerContext);
                     } else {
-                        hasMore = scanner.next(result);
+                        hasMore = (scannerContext == null)
+                                ? scanner.next(result)
+                                : scanner.next(result, scannerContext);
                     }
                     if (result.isEmpty()) {
                         return hasMore;
@@ -298,22 +303,22 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
 
         @Override
         public boolean next(List<Cell> result) throws IOException {
-           return next(result, false);
+            return next(result, false, null);
         }
 
         @Override
         public boolean nextRaw(List<Cell> result) throws IOException {
-            return next(result, true);
+            return next(result, true, null);
         }
 
         @Override
         public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-            throw new IOException("next with scannerContext should not be 
called in Phoenix environment");
+            return next(result, false, scannerContext);
         }
 
         @Override
         public boolean nextRaw(List<Cell> result, ScannerContext 
scannerContext) throws IOException {
-            throw new IOException("NextRaw with scannerContext should not be 
called in Phoenix environment");
+            return next(result, true, scannerContext);
         }
 
         @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 12873c9081..ddb04a9c82 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -41,6 +41,8 @@ import 
org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.cache.GlobalCache;
@@ -172,19 +174,21 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
         if (scanOffset != null) {
             final boolean isIncompatibleClient =
                     
ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan);
+            RegionScannerResultIterator iterator = new 
RegionScannerResultIterator(scan,
+                    innerScanner,
+                    getMinMaxQualifiersFromScan(scan),
+                    encodingScheme);
+            ScannerContext sc = iterator.getRegionScannerContext();
             innerScanner = getOffsetScanner(
                     innerScanner,
                     new OffsetResultIterator(
-                            new RegionScannerResultIterator(
-                                    innerScanner,
-                                    getMinMaxQualifiersFromScan(scan),
-                                    encodingScheme),
+                            iterator,
                             scanOffset,
                             getPageSizeMsForRegionScanner(scan),
                             isIncompatibleClient),
                     scan.getAttribute(QueryConstants.LAST_SCAN) != null,
                     isIncompatibleClient,
-                    scan);
+                    scan, sc);
         }
         boolean spoolingEnabled =
                 env.getConfiguration().getBoolean(
@@ -194,21 +198,22 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
                 env.getConfiguration()
                         
.getLongBytes(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB,
                         
QueryServicesOptions.DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES);
-        final OrderedResultIterator iterator =
-                deserializeFromScan(scan, innerScanner, spoolingEnabled, 
thresholdBytes);
+        OrderedResultIteratorWithScannerContext ic
+                = deserializeFromScan(scan, innerScanner, spoolingEnabled, 
thresholdBytes);
+        final OrderedResultIterator iterator = ic.getIterator();
         if (iterator == null) {
             return innerScanner;
         }
         // TODO:the above wrapped scanner should be used here also
-        return getTopNScanner(env, innerScanner, iterator, tenantId);
+        return getTopNScanner(env, innerScanner, iterator, tenantId, 
ic.getScannerContext());
     }
 
     @VisibleForTesting
-    static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner 
s,
-                                                     boolean spoolingEnabled, 
long thresholdBytes) {
+    static OrderedResultIteratorWithScannerContext deserializeFromScan(Scan 
scan, RegionScanner s,
+                                                   boolean spoolingEnabled, 
long thresholdBytes) {
         byte[] topN = 
scan.getAttribute(BaseScannerRegionObserverConstants.TOPN);
         if (topN == null) {
-            return null;
+            return new OrderedResultIteratorWithScannerContext(null, null);
         }
         int clientVersion = ScanUtil.getClientVersion(scan);
         // Client including and after 4.15 and 5.1 are not going to serialize 
thresholdBytes
@@ -239,11 +244,14 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
             }
             PTable.QualifierEncodingScheme encodingScheme =
                     EncodedColumnsUtil.getQualifierEncodingScheme(scan);
-            ResultIterator inner = new RegionScannerResultIterator(s,
+            RegionScannerResultIterator inner = new 
RegionScannerResultIterator(scan, s,
                     EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), 
encodingScheme);
-            return new OrderedResultIterator(inner, orderByExpressions, 
spoolingEnabled,
+            OrderedResultIterator iterator
+                    = new OrderedResultIterator(inner, orderByExpressions, 
spoolingEnabled,
                     thresholdBytes, limit >= 0 ? limit : null, null, 
estimatedRowSize,
                     getPageSizeMsForRegionScanner(scan), scan, 
s.getRegionInfo());
+            return new 
OrderedResultIteratorWithScannerContext(inner.getRegionScannerContext(),
+                    iterator);
         } catch (IOException e) {
             throw new RuntimeException(e);
         } finally {
@@ -255,6 +263,24 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
         }
     }
 
+    private static class OrderedResultIteratorWithScannerContext {
+        private ScannerContext scannerContext;
+        private OrderedResultIterator iterator;
+
+        OrderedResultIteratorWithScannerContext(ScannerContext sc, 
OrderedResultIterator ori) {
+            this.scannerContext = sc;
+            this.iterator = ori;
+        }
+
+        public ScannerContext getScannerContext() {
+            return scannerContext;
+        }
+
+        public OrderedResultIterator getIterator() {
+            return iterator;
+        }
+    }
+
     private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan 
scan, RegionScanner s,
                                                                           
Set<KeyValueColumnExpression> arrayKVRefs) {
         byte[] specificArrayIdx = 
scan.getAttribute(BaseScannerRegionObserverConstants.SPECIFIC_ARRAY_INDEX);
@@ -295,7 +321,7 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
                                            final OffsetResultIterator iterator,
                                            final boolean isLastScan,
                                            final boolean incompatibleClient,
-                                           final Scan scan)
+                                           final Scan scan, final 
ScannerContext sc)
             throws IOException {
         final Tuple firstTuple;
         final Region region = getRegion();
@@ -372,6 +398,7 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
         return new BaseRegionScanner(s) {
             private Tuple tuple = firstTuple;
             private byte[] previousResultRowKey;
+            private ScannerContext regionScannerContext = sc;
 
             @Override
             public boolean isFilterDone() {
@@ -380,6 +407,12 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
 
             @Override
             public boolean next(List<Cell> results) throws IOException {
+                return next(results, null);
+            }
+
+            @Override
+            public boolean next(List<Cell> results, ScannerContext 
scannerContext)
+                    throws IOException {
                 try {
                     if (isFilterDone()) {
                         return false;
@@ -413,6 +446,10 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
                         }
                     }
                     tuple = nextTuple;
+                    if (regionScannerContext != null) {
+                        ScannerContextUtil.updateMetrics(regionScannerContext, 
scannerContext);
+                        regionScannerContext = null;
+                    }
                     return !isFilterDone();
                 } catch (Throwable t) {
                     LOGGER.error("Error while iterating Offset scanner.", t);
@@ -421,6 +458,12 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
                 }
             }
 
+            @Override
+            public boolean nextRaw(List<Cell> results, ScannerContext 
scannerContext)
+                    throws IOException {
+                return next(results, scannerContext);
+            }
+
             @Override
             public void close() throws IOException {
                 try {
@@ -475,7 +518,9 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
      *  since after this everything is held in memory
      */
     private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, 
final RegionScanner s,
-                                         final OrderedResultIterator iterator, 
ImmutableBytesPtr tenantId) throws Throwable {
+                                         final OrderedResultIterator iterator,
+                                         ImmutableBytesPtr tenantId, 
ScannerContext sc)
+            throws Throwable {
 
         final Tuple firstTuple;
         TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
@@ -498,6 +543,7 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
         }
         return new BaseRegionScanner(s) {
             private Tuple tuple = firstTuple;
+            private ScannerContext regionScannerContext = sc;
 
             @Override
             public boolean isFilterDone() {
@@ -506,6 +552,12 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
 
             @Override
             public boolean next(List<Cell> results) throws IOException {
+                return next(results, null);
+            }
+
+            @Override
+            public boolean next(List<Cell> results, ScannerContext 
scannerContext)
+                    throws IOException {
                 try {
                     if (isFilterDone()) {
                         return false;
@@ -518,6 +570,10 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
                         }
                     }
                     tuple = iterator.next();
+                    if (regionScannerContext != null) {
+                        ScannerContextUtil.updateMetrics(regionScannerContext, 
scannerContext);
+                        regionScannerContext = null;
+                    }
                     return !isFilterDone();
                 } catch (Throwable t) {
                     
ClientUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t);
@@ -525,6 +581,12 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
                 }
             }
 
+            @Override
+            public boolean nextRaw(List<Cell> results, ScannerContext 
scannerContext)
+                    throws IOException {
+                return next(results, scannerContext);
+            }
+
             @Override
             public void close() throws IOException {
                 try {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index 112bb5f63e..1b93d7f82f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -209,8 +209,15 @@ public abstract class RegionScannerFactory {
 
       @Override
       public boolean next(List<Cell> results) throws IOException {
+        return next(results, null);
+      }
+
+      @Override
+      public boolean next(List<Cell> results, ScannerContext scannerContext) 
throws IOException {
         try {
-          boolean next = s.next(results);
+          boolean next = (scannerContext == null)
+                  ? s.next(results)
+                  : s.next(results, scannerContext);
           if (ScanUtil.isDummy(results)) {
             return true;
           }
@@ -221,10 +228,6 @@ public abstract class RegionScannerFactory {
         }
       }
 
-      @Override
-      public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-          throw new IOException("Next with scannerContext should not be called 
in Phoenix environment");
-      }
 
       @Override
       public void close() throws IOException {
@@ -253,8 +256,15 @@ public abstract class RegionScannerFactory {
 
       @Override
       public boolean nextRaw(List<Cell> result) throws IOException {
+        return nextRaw(result, null);
+      }
+
+      @Override
+      public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
         try {
-          boolean next = s.nextRaw(result);
+          boolean next = (scannerContext == null)
+                  ? s.nextRaw(result)
+                  : s.nextRaw(result, scannerContext);
           if (ScanUtil.isDummy(result)) {
             return true;
           }
@@ -309,6 +319,10 @@ public abstract class RegionScannerFactory {
               return false;
           }
           // There is a scanattribute set to retrieve the specific array 
element
+          if (scannerContext != null) {
+            ScannerContextUtil.incrementSizeProgress(scannerContext, result);
+            ScannerContextUtil.updateTimeProgress(scannerContext);
+          }
           return next;
         } catch (Throwable t) {
           
ClientUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(),
 t);
@@ -369,13 +383,6 @@ public abstract class RegionScannerFactory {
         return new Pair<>(tuple, new byte[0]);
       }
 
-      @Override
-      public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
-          throws IOException {
-        boolean res = next(result);
-        ScannerContextUtil.incrementSizeProgress(scannerContext, result);
-        return res;
-      }
 
       /**
        * When there is a merge in progress while scanning local indexes we 
might get the key values less than scan start row.
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
index b05696e758..d186ef881e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java
@@ -24,7 +24,9 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.schema.PTable.QualifierEncodingScheme;
 import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList;
@@ -43,12 +45,21 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
     private final Pair<Integer, Integer> minMaxQualifiers;
     private final boolean useQualifierAsIndex;
     private final QualifierEncodingScheme encodingScheme;
+    private final ScannerContext regionScannerContext;
     
-    public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, 
Integer> minMaxQualifiers, QualifierEncodingScheme encodingScheme) {
+    public RegionScannerResultIterator(Scan scan, RegionScanner scanner,
+                                       Pair<Integer, Integer> minMaxQualifiers,
+                                       QualifierEncodingScheme encodingScheme) 
{
         this.scanner = scanner;
         this.useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers);
         this.minMaxQualifiers = minMaxQualifiers;
         this.encodingScheme = encodingScheme;
+        if (scan.isScanMetricsEnabled()) {
+            regionScannerContext = ScannerContext.newBuilder()
+                    .setTrackMetrics(scan.isScanMetricsEnabled()).build();
+        } else {
+            regionScannerContext = null;
+        }
     }
 
     @Override
@@ -62,7 +73,12 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
                 // Results are potentially returned even when the return value 
of s.next is false
                 // since this is an indication of whether or not there are 
more values after the
                 // ones returned
-                boolean hasMore = scanner.nextRaw(results);
+                boolean hasMore;
+                if (regionScannerContext == null) {
+                    hasMore = scanner.nextRaw(results);
+                } else {
+                    hasMore = scanner.nextRaw(results, regionScannerContext);
+                }
 
                 if (!hasMore && results.isEmpty()) {
                     return null;
@@ -81,6 +97,9 @@ public class RegionScannerResultIterator extends 
BaseResultIterator {
         }
     }
 
+    public ScannerContext getRegionScannerContext() {
+        return regionScannerContext;
+    }
        @Override
        public String toString() {
                return "RegionScannerResultIterator [scanner=" + scanner + "]";
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
index b82ac71454..f9fb3fc9e4 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -37,6 +37,7 @@ import java.util.Properties;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.monitoring.MetricType;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.types.PDate;
@@ -427,6 +428,34 @@ public class ServerPagingIT extends 
ParallelStatsDisabledIT {
         }
     }
 
+    @Test
+    public void testNumberOfRPCsWithPaging() throws SQLException {
+        // insert 200 rows
+        Connection conn = DriverManager.getConnection(getUrl());
+        String tableName = generateUniqueName();
+        PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
+        stmt.execute("CREATE TABLE " + tableName
+                + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
+        for (int i = 1; i <= 200; i++) {
+            String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", 
tableName, i, i);
+            stmt.execute(sql);
+        }
+        conn.commit();
+
+        // delete every alternate row
+        for (int i=1; i<=200; i=i+2) {
+            stmt.execute("DELETE FROM " + tableName + " WHERE A = " + i);
+            conn.commit();
+        }
+
+        ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+        while (rs.next()) {
+        }
+        Map<String, Map<MetricType, Long>> metrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
+        long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS);
+        Assert.assertEquals(101, numRpc);
+    }
+
     private void populateTable(String tableName) throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
         conn.createStatement().execute("create table " + tableName +
@@ -438,4 +467,15 @@ public class ServerPagingIT extends 
ParallelStatsDisabledIT {
         conn.commit();
         conn.close();
     }
+
+    private long getMetricValue(Map<String, Map<MetricType, Long>> metrics, 
MetricType type) {
+        long result = 0;
+        for (Map.Entry<String, Map<MetricType, Long>> entry : 
metrics.entrySet()) {
+            Long val = entry.getValue().get(type);
+            if (val != null) {
+                result += val.longValue();
+            }
+        }
+        return result;
+    }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
new file mode 100644
index 0000000000..d074d4a483
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.monitoring;
+
+import static org.junit.Assert.assertEquals;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class CountRowsScannedIT extends BaseTest {
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(2);
+        props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true");
+        // force many rpc calls
+        props.put(QueryServices.SCAN_CACHE_SIZE_ATTRIB, "10");
+        setUpTestDriver(new ReadOnlyProps(props));
+    }
+
+    @Test
+    public void testSinglePrimaryKey() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        String tableName = generateUniqueName();
+        PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
+        stmt.execute("CREATE TABLE " + tableName
+                + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
+        for (int i = 1; i <= 100; i++) {
+            String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", 
tableName, i, i);
+            stmt.execute(sql);
+        }
+        conn.commit();
+
+        // both columns, but primary key 3 to 100, 98 rows
+        long count1 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE A >= 3 AND Z >= 7");
+        assertEquals(98, count1);
+
+        // primary key, 3 to 100, 98 rows
+        long count2 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE A >= 3");
+        assertEquals(98, count2);
+
+        // non-primary key, all rows
+        long count3 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE Z >= 7");
+        assertEquals(100, count3);
+
+        // primary key with limit, the first row
+        long count4 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE A >= 3 limit 1");
+        assertEquals(1, count4);
+
+        // non-primary key with limit, find the first Z >= 7, 1 to 7, 7 rows
+        long count5 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 limit 1");
+        assertEquals(7, count5);
+
+        // primary key with order by primary and limit
+        long count6 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE A >= 3 ORDER BY A 
limit 1");
+        assertEquals(1, count6);
+
+        // primary key with order by non-primary and limit
+        long count7 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE A >= 3 ORDER BY Z 
limit 1");
+        assertEquals(98, count7);
+
+        // select non-primary key with order by primary limit
+        long count8 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 ORDER BY A 
limit 1");
+        assertEquals(7, count8);
+
+        // select non-primary key with order by primary limit desc
+        // scan from the last, 1 row
+        long count9 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 ORDER BY A 
desc limit 1");
+        assertEquals(1, count9);
+
+        // select non-primary key with order by primary limit desc
+        // scan from the last, 1 row
+        long count10 = countRowsScannedFromSql(stmt, "SELECT A,Z FROM " + 
tableName
+                + " WHERE Z >= 7 AND Z <= 60 ORDER BY A desc limit 1");
+        assertEquals(41, count10);
+
+        // select non-primary key with order by primary limit
+        long count11 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 ORDER BY Z 
limit 1");
+        assertEquals(100, count11);
+
+        // skip scan
+        long count12 = countRowsScannedFromSql(stmt,
+                "SELECT A,Z FROM " + tableName + " WHERE A in (20, 45, 68, 
3)");
+        assertEquals(4, count12);
+    }
+
+    @Test
+    public void testMultiPrimaryKeys() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        String tableName = generateUniqueName();
+        PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName
+                + " (A UNSIGNED_LONG NOT NULL, B UNSIGNED_LONG NOT NULL, "
+                + " Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (A, B))");
+        for (int i = 1; i <= 100; i++) {
+            String sql = String
+                    .format("UPSERT INTO %s VALUES (%d, %d, %d)", tableName, 
(i % 5) + 1, i, i);
+            stmt.execute(sql);
+        }
+        conn.commit();
+
+        // pk1 and pk2, only needed rows are scanned
+        long count1 = countRowsScannedFromSql(stmt,
+                "SELECT A,B,Z FROM " + tableName + " WHERE A >= 2 AND B >= 3");
+        assertEquals(79, count1);
+
+        // pk2, all rows
+        long count2 = countRowsScannedFromSql(stmt,
+                "SELECT A,B,Z FROM " + tableName + " WHERE B >= 3");
+        assertEquals(100, count2);
+
+        // non-pk,  all rows
+        long count3 = countRowsScannedFromSql(stmt,
+                "SELECT A,B,Z FROM " + tableName + " WHERE Z >= 7");
+        assertEquals(100, count3);
+
+        // non group aggregate, pk2 only, all rows
+        long count4 = countRowsScannedFromSql(stmt, "SELECT SUM(A) FROM " + 
tableName
+                + " WHERE B >= 3");
+        assertEquals(100, count4);
+
+        // pk1 and pk2, group by
+        long count5 = countRowsScannedFromSql(stmt, "SELECT B, SUM(A), SUM(Z) 
FROM " + tableName
+                + " WHERE A >= 2 AND B >= 3 GROUP BY B");
+        assertEquals(79, count5);
+
+        // pk1 and pk2, group by, ordered
+        long count6 = countRowsScannedFromSql(stmt, "SELECT B, SUM(A), SUM(Z) 
FROM " + tableName
+                + " WHERE A >= 2 AND B >= 3 GROUP BY B ORDER BY B DESC");
+        assertEquals(79, count6);
+    }
+
+    @Test
+    public void testQueryWithDeleteMarkers() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        String tableName = generateUniqueName();
+        PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
+        stmt.execute("CREATE TABLE " + tableName
+                + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
+        for (int i = 1; i <= 100; i++) {
+            String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", 
tableName, i, i);
+            stmt.execute(sql);
+        }
+        conn.commit();
+        String selectQuery = "SELECT A,Z FROM " + tableName + " LIMIT 1";
+        for (int i=10; i<=100; i=i+10) {
+            stmt.execute("DELETE FROM " + tableName + " WHERE A < " + i);
+            conn.commit();
+            long count = countRowsScannedFromSql(stmt, selectQuery);
+            assertEquals(i, count);
+        }
+    }
+
+    @Test
+    public void testQueryIndex() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
+        stmt.execute("CREATE TABLE " + tableName
+                + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)");
+        stmt.execute("CREATE INDEX " + indexName + " ON " + tableName + "(Z) 
INCLUDE (A)");
+        for (int i = 1; i <= 100; i++) {
+            String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", 
tableName, i, i);
+            stmt.execute(sql);
+        }
+        conn.commit();
+        String selectQuery = "SELECT A FROM " + tableName + " WHERE Z > 49 AND 
Z < 71";
+        long count = countRowsScannedFromSql(stmt, selectQuery);
+        assertEquals(21, count);
+        Assert.assertEquals(indexName, 
stmt.getQueryPlan().getTableRef().getTable().getTableName().toString());
+    }
+
+    @Test
+    public void testJoin() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        String tableName1 = generateUniqueName();
+        String tableName2 = generateUniqueName();
+        PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName1
+                + " (A UNSIGNED_LONG NOT NULL, B UNSIGNED_LONG NOT NULL, "
+                + " Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (A, B))");
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName2
+                + " (A UNSIGNED_LONG NOT NULL, B UNSIGNED_LONG NOT NULL, "
+                + " Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (A, B))");
+        // table1.B = table2.A
+        for (int i = 1; i <= 100; i++) {
+            // table1.B in [51, 150], table2.A in [1, 100]
+            String sql1 = String
+                    .format("UPSERT INTO %s VALUES (%d, %d, %d)", tableName1, 
i, i + 50, i);
+            stmt.execute(sql1);
+            String sql2 = String.format("UPSERT INTO %s VALUES (%d, %d, %d)", 
tableName2, i, i, i);
+            stmt.execute(sql2);
+        }
+
+        conn.commit();
+
+        // table1
+        long count1 = countRowsScannedFromSql(stmt,
+                "SELECT * FROM " + tableName1 + " WHERE A >= 40");
+        assertEquals(61, count1);
+
+        // table2, all rows
+        long count2 = countRowsScannedFromSql(stmt,
+                "SELECT * FROM " + tableName2 + " WHERE B >= 20");
+        assertEquals(100, count2);
+
+        // join
+        String sqlJoin = "SELECT X.K, X.VX, Y.VY FROM ( SELECT B AS K, A AS VX 
FROM " + tableName1
+                + " WHERE A >= 40) X JOIN (SELECT A AS K, B AS VY FROM " + 
tableName2
+                + " WHERE B >= 20) Y ON X.K=Y.K";
+        long count3 = countRowsScannedFromSql(stmt, sqlJoin);
+        assertEquals(161, count3);
+    }
+
+    @Test
+    public void testUnionAll() throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        String tableName1 = generateUniqueName();
+        String tableName2 = generateUniqueName();
+        PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName1
+                + " (A UNSIGNED_LONG NOT NULL, Z UNSIGNED_LONG, CONSTRAINT pk 
PRIMARY KEY (A))");
+        stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName2
+                + " (B UNSIGNED_LONG NOT NULL, Z UNSIGNED_LONG, CONSTRAINT pk 
PRIMARY KEY (B))");
+        for (int i = 1; i <= 100; i++) {
+            String sql1 = String.format("UPSERT INTO %s VALUES (%d, %d)", 
tableName1, i, i);
+            stmt.execute(sql1);
+            String sql2 = String.format("UPSERT INTO %s VALUES (%d, %d)", 
tableName2, i, i);
+            stmt.execute(sql2);
+        }
+
+        conn.commit();
+
+        // table1
+        long count1 = countRowsScannedFromSql(stmt,
+                "SELECT A, Z FROM " + tableName1 + " WHERE A >= 40");
+        assertEquals(61, count1);
+
+        // table2, all rows
+        long count2 = countRowsScannedFromSql(stmt,
+                "SELECT B, Z FROM " + tableName2 + " WHERE B >= 20");
+        assertEquals(81, count2);
+
+        // union all
+        String sqlUnionAll = "SELECT SUM(Z) FROM ( SELECT Z FROM " + tableName1
+                + " WHERE A >= 40 UNION ALL SELECT Z FROM " + tableName2 + " 
WHERE B >= 20)";
+        long count3 = countRowsScannedFromSql(stmt, sqlUnionAll);
+        assertEquals(142, count3);
+
+        // union all then group by
+        String sqlUnionAllGroupBy = "SELECT K, SUM(Z) FROM ( SELECT A AS K, Z 
FROM " + tableName1
+                + " WHERE A >= 40 UNION ALL SELECT B AS K, Z FROM " + 
tableName2
+                + " WHERE B >= 20) GROUP BY K";
+        long count4 = countRowsScannedFromSql(stmt, sqlUnionAllGroupBy);
+        assertEquals(142, count4);
+    }
+
+    private long countRowsScannedFromSql(Statement stmt, String sql) throws 
SQLException {
+        ResultSet rs = stmt.executeQuery(sql);
+        while (rs.next()) {
+            // loop to the end
+        }
+        return getRowsScanned(rs);
+    }
+
+    private long getRowsScanned(ResultSet rs) throws SQLException {
+        if (!(rs instanceof PhoenixResultSet)) {
+            return -1;
+        }
+        Map<String, Map<MetricType, Long>> metrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
+
+        long sum = 0;
+        boolean valid = false;
+        for (Map.Entry<String, Map<MetricType, Long>> entry : 
metrics.entrySet()) {
+            Long val = entry.getValue().get(MetricType.COUNT_ROWS_SCANNED);
+            if (val != null) {
+                sum += val.longValue();
+                valid = true;
+            }
+        }
+        if (valid) {
+            return sum;
+        } else {
+            return -1;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
index 0dfc13e385..eaf4674b67 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.coprocessor.BaseRegionScanner;
@@ -173,6 +174,10 @@ public class NonTxIndexBuilderTest extends 
BaseConnectionlessQueryTest {
                 }
                 return false; // indicate no more results
             }
+
+            public boolean next(List<Cell> result, ScannerContext 
scannerContext) throws IOException {
+                return next(result);
+            }
         };
     }
 


Reply via email to