This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch 5.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push: new bd0750f461 PHOENIX-5117 : Return the count of rows scanned in HBase (#1936) (#2088) bd0750f461 is described below commit bd0750f461dfa123c91cee412b21eab83e9403b1 Author: Palash Chauhan <palashc...@gmail.com> AuthorDate: Sat Mar 15 10:15:19 2025 -0700 PHOENIX-5117 : Return the count of rows scanned in HBase (#1936) (#2088) --- .../org/apache/phoenix/compile/QueryCompiler.java | 29 +- .../apache/phoenix/compile/StatementContext.java | 12 + .../org/apache/phoenix/execute/HashJoinPlan.java | 3 + .../phoenix/iterate/MergeSortResultIterator.java | 5 +- .../phoenix/iterate/OffsetResultIterator.java | 1 - .../phoenix/iterate/OrderedResultIterator.java | 3 +- .../org/apache/phoenix/jdbc/PhoenixResultSet.java | 9 +- .../apache/phoenix/optimize/QueryOptimizer.java | 3 +- .../hbase/regionserver/ScannerContextUtil.java | 26 +- .../cache/aggcache/SpillableGroupByCache.java | 6 + .../phoenix/coprocessor/BaseRegionScanner.java | 7 +- .../coprocessor/BaseScannerRegionObserver.java | 4 +- .../phoenix/coprocessor/DelegateRegionScanner.java | 32 +- .../GroupedAggregateRegionObserver.java | 49 ++- .../phoenix/coprocessor/HashJoinRegionScanner.java | 28 +- .../coprocessor/IndexRebuildRegionScanner.java | 5 + .../coprocessor/IndexRepairRegionScanner.java | 5 + .../phoenix/coprocessor/IndexerRegionScanner.java | 5 + .../phoenix/coprocessor/PagingRegionScanner.java | 55 +++- .../coprocessor/PhoenixTTLRegionObserver.java | 6 +- .../phoenix/coprocessor/TTLRegionScanner.java | 38 ++- .../coprocessor/UncoveredIndexRegionScanner.java | 5 + .../UncoveredLocalIndexRegionScanner.java | 5 + .../UngroupedAggregateRegionObserver.java | 24 ++ .../UngroupedAggregateRegionScanner.java | 17 +- .../apache/phoenix/index/GlobalIndexChecker.java | 19 +- .../iterate/NonAggregateRegionScannerFactory.java | 92 +++++- .../phoenix/iterate/RegionScannerFactory.java | 33 +- .../iterate/RegionScannerResultIterator.java | 23 +- .../org/apache/phoenix/end2end/ServerPagingIT.java | 40 +++ .../phoenix/monitoring/CountRowsScannedIT.java | 332 +++++++++++++++++++++ .../hbase/index/covered/NonTxIndexBuilderTest.java | 5 + 32 files changed, 826 insertions(+), 100 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index eb60562c32..e2be77cea9 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -116,20 +116,29 @@ public class QueryCompiler { private final boolean optimizeSubquery; private final Map<TableRef, QueryPlan> dataPlans; private final boolean costBased; + private final StatementContext parentContext; public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans) throws SQLException { this(statement, select, resolver, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, optimizeSubquery, dataPlans); } public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, BindManager bindManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans) throws SQLException { - this(statement, select, resolver, bindManager, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, optimizeSubquery, dataPlans); + this(statement, select, resolver, bindManager, Collections.<PDatum>emptyList(), null, new SequenceManager(statement), projectTuples, optimizeSubquery, dataPlans, null); } public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans) throws SQLException { - this(statement, select, resolver, new BindManager(statement.getParameters()), targetColumns, parallelIteratorFactory, sequenceManager, projectTuples, optimizeSubquery, dataPlans); + this(statement, select, resolver, new BindManager(statement.getParameters()), targetColumns, parallelIteratorFactory, sequenceManager, projectTuples, optimizeSubquery, dataPlans, null); } public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, BindManager bindManager, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans) throws SQLException { + this(statement, select, resolver, bindManager, targetColumns, parallelIteratorFactory, sequenceManager, projectTuples, optimizeSubquery, dataPlans, null); + } + + public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans, StatementContext parentContext) throws SQLException { + this(statement, select, resolver, new BindManager(statement.getParameters()), targetColumns, parallelIteratorFactory, sequenceManager, projectTuples, optimizeSubquery, dataPlans, parentContext); + } + + public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, BindManager bindManager, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, boolean optimizeSubquery, Map<TableRef, QueryPlan> dataPlans, StatementContext parentContext) throws SQLException { this.statement = statement; this.select = select; this.resolver = resolver; @@ -152,6 +161,7 @@ public class QueryCompiler { this.originalScan = ScanUtil.newScan(scan); this.optimizeSubquery = optimizeSubquery; this.dataPlans = dataPlans == null ? Collections.<TableRef, QueryPlan>emptyMap() : dataPlans; + this.parentContext = parentContext; } public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List<? extends PDatum> targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager) throws SQLException { @@ -239,6 +249,9 @@ public class QueryCompiler { public QueryPlan compileSelect(SelectStatement select) throws SQLException{ StatementContext context = createStatementContext(); + if (parentContext != null) { + parentContext.addSubStatementContext(context); + } if (select.isJoin()) { JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver()); return compileJoinQuery(context, joinTable, false, false, null); @@ -637,9 +650,13 @@ public class QueryCompiler { return type == JoinType.Semi && complete; } + protected QueryPlan compileSubquery(SelectStatement subquery, boolean pushDownMaxRows) throws SQLException { + return compileSubquery(subquery, pushDownMaxRows, null); + } + protected QueryPlan compileSubquery( SelectStatement subquerySelectStatement, - boolean pushDownMaxRows) throws SQLException { + boolean pushDownMaxRows, StatementContext parentContext) throws SQLException { PhoenixConnection phoenixConnection = this.statement.getConnection(); RewriteResult rewriteResult = ParseNodeUtil.rewrite(subquerySelectStatement, phoenixConnection); @@ -658,6 +675,9 @@ public class QueryCompiler { statement, queryPlan); } + if (parentContext != null) { + parentContext.addSubStatementContext(queryPlan.getContext()); + } this.statement.setMaxRows(maxRows); // restore maxRows. return queryPlan; } @@ -673,14 +693,13 @@ public class QueryCompiler { throw new SQLException("RVC Offset not allowed with subqueries."); } - QueryPlan innerPlan = compileSubquery(innerSelect, false); + QueryPlan innerPlan = compileSubquery(innerSelect, false, context); if (innerPlan instanceof UnionPlan) { UnionCompiler.optimizeUnionOrderByIfPossible( (UnionPlan) innerPlan, select, this::createStatementContext); } - RowProjector innerQueryPlanRowProjector = innerPlan.getProjector(); TupleProjector tupleProjector = new TupleProjector(innerQueryPlanRowProjector); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java index a7abbd5dd5..d35d1b1530 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/StatementContext.java @@ -47,6 +47,7 @@ import org.apache.phoenix.schema.types.PDate; import org.apache.phoenix.schema.types.PTime; import org.apache.phoenix.schema.types.PTimestamp; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.thirdparty.com.google.common.collect.Sets; import org.apache.phoenix.util.DateUtil; import org.apache.phoenix.util.NumberUtil; import org.apache.phoenix.util.ReadOnlyProps; @@ -86,6 +87,7 @@ public class StatementContext { private boolean isClientSideUpsertSelect; private boolean isUncoveredIndex; private AtomicBoolean hasFirstValidResult; + private Set<StatementContext> subStatementContexts; public StatementContext(PhoenixStatement statement) { this(statement, new Scan()); @@ -114,6 +116,7 @@ public class StatementContext { this.isClientSideUpsertSelect = context.isClientSideUpsertSelect; this.isUncoveredIndex = context.isUncoveredIndex; this.hasFirstValidResult = new AtomicBoolean(context.getHasFirstValidResult()); + this.subStatementContexts = Sets.newHashSet(); } /** * Constructor that lets you override whether or not to collect request level metrics. @@ -159,6 +162,7 @@ public class StatementContext { this.overAllQueryMetrics = new OverAllQueryMetrics(isRequestMetricsEnabled,connection.getLogLevel()); this.retryingPersistentCache = Maps.<Long, Boolean> newHashMap(); this.hasFirstValidResult = new AtomicBoolean(false); + this.subStatementContexts = Sets.newHashSet(); } /** @@ -390,4 +394,12 @@ public class StatementContext { return retrying; } } + + public void addSubStatementContext(StatementContext sub) { + subStatementContexts.add(sub); + } + + public Set<StatementContext> getSubStatementContexts() { + return subStatementContexts; + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index fbe5edd161..259d718acb 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -153,6 +153,9 @@ public class HashJoinPlan extends DelegateQueryPlan { QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); this.serverCacheLimit = services.getProps().getLongBytes( QueryServices.MAX_SERVER_CACHE_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_SIZE); + for (SubPlan subPlan: subPlans) { + this.getContext().addSubStatementContext(subPlan.getInnerPlan().getContext()); + } } @Override diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java index d094bec568..8392ab6f48 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java @@ -56,7 +56,10 @@ public abstract class MergeSortResultIterator implements PeekingResultIterator { @Override public Tuple next() throws SQLException { MaterializedComparableResultIterator iterator = minIterator(); - if (iterator == null) { return null; } + if (iterator == null) { + close(); + return null; + } Tuple next = iterator.next(); minHeap.poll(); if (iterator.peek() != null) { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java index af23429cca..f16bb84948 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java @@ -22,7 +22,6 @@ import java.util.List; import static org.apache.phoenix.util.ScanUtil.getDummyTuple; import static org.apache.phoenix.util.ScanUtil.isDummy; - import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.EnvironmentEdgeManager; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index 4e36ae1a2a..3d6713b3b8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -217,7 +217,8 @@ public class OrderedResultIterator implements PeekingResultIterator { public OrderedResultIterator(ResultIterator delegate, List<OrderByExpression> orderByExpressions, boolean spoolingEnabled, - long thresholdBytes, Integer limit, Integer offset, int estimatedRowSize, long pageSizeMs) { + long thresholdBytes, Integer limit, Integer offset, + int estimatedRowSize, long pageSizeMs) { checkArgument(!orderByExpressions.isEmpty()); this.delegate = delegate; this.orderByExpressions = orderByExpressions; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 4d5f038e0a..fb2fcb3435 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -1480,7 +1480,14 @@ public class PhoenixResultSet implements PhoenixMonitoredResultSet, SQLCloseable @Override public Map<String, Map<MetricType, Long>> getReadMetrics() { - return readMetricsQueue.aggregate(); + ReadMetricQueue one = readMetricsQueue; + if (context != null) { + for (StatementContext sub : context.getSubStatementContexts()) { + ReadMetricQueue subMetric = sub.getReadMetricsQueue(); + one.combineReadMetrics(subMetric); + } + } + return one.aggregate(); } @Override diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index faec49322b..42cf622824 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -195,7 +195,8 @@ public class QueryOptimizer { dataPlan.getContext().getSequenceManager(), true, true, - dataPlans); + dataPlans, + dataPlan.getContext()); return Collections.singletonList(compiler.compile()); } diff --git a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java index 3103457325..7bdb2c0b8b 100644 --- a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java +++ b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java @@ -20,9 +20,9 @@ package org.apache.hadoop.hbase.regionserver; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.PrivateCellUtil; /** @@ -36,4 +36,28 @@ public class ScannerContextUtil { cell.heapSize()); } } + + public static void updateMetrics(ScannerContext src, ScannerContext dst) { + if (src != null && dst != null && src.isTrackingMetrics() && dst.isTrackingMetrics()) { + for (Map.Entry<String, Long> entry : src.getMetrics().getMetricsMap().entrySet()) { + dst.metrics.addToCounter(entry.getKey(), entry.getValue()); + } + } + } + + public static ScannerContext copyNoLimitScanner(ScannerContext sc) { + return new ScannerContext(sc.keepProgress, null, sc.isTrackingMetrics()); + } + + public static void updateTimeProgress(ScannerContext sc) { + sc.updateTimeProgress(); + } + + /** + * Set returnImmediately on the ScannerContext to true, it will have the same behavior + * as reaching the time limit. Use this to make RSRpcService.scan return immediately. + */ + public static void setReturnImmediately(ScannerContext sc) { + sc.returnImmediately(); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java index a753365692..f677b081f2 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.TenantCache; @@ -365,6 +366,11 @@ public class SpillableGroupByCache implements GroupByCache { } } + public boolean next(List<Cell> result, ScannerContext scannerContext) + throws IOException { + return next(result); + } + @Override public boolean next(List<Cell> results) throws IOException { if (!cacheIter.hasNext()) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java index 8533bb6391..23a9f075c4 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java @@ -42,9 +42,8 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner { public abstract boolean next(List<Cell> results) throws IOException; @Override - public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - throw new IOException("Next with scannerContext should not be called in Phoenix environment"); - } + public abstract boolean next(List<Cell> result, ScannerContext scannerContext) + throws IOException; @Override public boolean reseek(byte[] row) throws IOException { @@ -58,6 +57,6 @@ public abstract class BaseRegionScanner extends DelegateRegionScanner { @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { - throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment"); + return next(result, scannerContext); } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 52062aa3de..d893f7f455 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -237,7 +237,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { overrideDelegate(); - boolean res = super.next(result); + boolean res = super.next(result, scannerContext); ScannerContextUtil.incrementSizeProgress(scannerContext, result); return res; } @@ -251,7 +251,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { overrideDelegate(); - boolean res = super.nextRaw(result); + boolean res = super.nextRaw(result, scannerContext); ScannerContextUtil.incrementSizeProgress(scannerContext, result); return res; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index bbd51f3396..3d74243191 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -25,6 +25,9 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; + +import static org.apache.phoenix.util.ScanUtil.isDummy; public class DelegateRegionScanner implements RegionScanner { @@ -61,22 +64,22 @@ public class DelegateRegionScanner implements RegionScanner { @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - throw new IOException("Next with scannerContext should not be called in Phoenix environment"); + return next(result, false, scannerContext); } @Override public boolean next(List<Cell> result) throws IOException { - return delegate.next(result); + return next(result, false, null); } @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { - throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment"); + return next(result, true, scannerContext); } @Override - public boolean nextRaw(List<Cell> arg0) throws IOException { - return delegate.nextRaw(arg0); + public boolean nextRaw(List<Cell> result) throws IOException { + return next(result, true, null); } @Override @@ -97,4 +100,23 @@ public class DelegateRegionScanner implements RegionScanner { throw new DoNotRetryIOException(e); } } + + private boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext) + throws IOException { + if (scannerContext != null) { + ScannerContext noLimitContext = ScannerContextUtil + .copyNoLimitScanner(scannerContext); + boolean hasMore = raw + ? delegate.nextRaw(result, noLimitContext) + : delegate.next(result, noLimitContext); + if (isDummy(result)) { + // when a dummy row is returned by a lower layer, set returnImmediately + // on the ScannerContext to force HBase to return a response to the client + ScannerContextUtil.setReturnImmediately(scannerContext); + } + ScannerContextUtil.updateMetrics(noLimitContext, scannerContext); + return hasMore; + } + return raw ? delegate.nextRaw(result) : delegate.next(result); + } } \ No newline at end of file diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 29e8c0a147..f36412f03f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.WritableUtils; @@ -352,6 +353,11 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im return new BaseRegionScanner(s) { private int index = 0; + public boolean next(List<Cell> result, ScannerContext scannerContext) + throws IOException { + return next(result); + } + @Override public void close() throws IOException { try { @@ -482,8 +488,20 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im } } + @Override + public boolean nextRaw(List<Cell> results, ScannerContext scannerContext) + throws IOException { + return next(results, scannerContext); + } + @Override public boolean next(List<Cell> resultsToReturn) throws IOException { + return next(resultsToReturn, null); + } + + @Override + public boolean next(List<Cell> resultsToReturn, ScannerContext scannerContext) + throws IOException { if (firstScan && actualScanStartRowKey != null) { if (scanStartRowKey.length > 0 && !ScanUtil.isLocalIndex(scan)) { if (hasRegionMoved()) { @@ -522,7 +540,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im if (firstScan) { firstScan = false; } - boolean moreRows = nextInternal(resultsToReturn); + boolean moreRows = nextInternal(resultsToReturn, scannerContext); if (ScanUtil.isDummy(resultsToReturn)) { return true; } @@ -560,7 +578,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im // If includeStartRowKey is false and the current rowkey is matching // with scanStartRowKey, return the next row result. resultsToReturn.clear(); - moreRows = nextInternal(resultsToReturn); + moreRows = nextInternal(resultsToReturn, scannerContext); if (ScanUtil.isDummy(resultsToReturn)) { return true; } @@ -578,7 +596,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im // If includeStartRowKey is true and the (current rowkey + "\0xx") is // matching with scanStartRowKey, return the next row result. resultsToReturn.clear(); - moreRows = nextInternal(resultsToReturn); + moreRows = nextInternal(resultsToReturn, scannerContext); if (ScanUtil.isDummy(resultsToReturn)) { return true; } @@ -590,7 +608,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im } // In the loop, keep iterating through rows. resultsToReturn.clear(); - moreRows = nextInternal(resultsToReturn); + moreRows = nextInternal(resultsToReturn, scannerContext); if (ScanUtil.isDummy(resultsToReturn)) { return true; } @@ -609,7 +627,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im * @return true if more rows exist after this one, false if scanner is done. * @throws IOException if something goes wrong. */ - private boolean nextInternal(List<Cell> resultsToReturn) throws IOException { + private boolean nextInternal(List<Cell> resultsToReturn, ScannerContext scannerContext) + throws IOException { boolean hasMore; long startTime = EnvironmentEdgeManager.currentTimeMillis(); long now; @@ -620,7 +639,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im acquiredLock = true; synchronized (delegate) { if (regionScanner != null) { - return regionScanner.next(resultsToReturn); + return regionScanner.next(resultsToReturn, scannerContext); } do { List<Cell> results = useQualifierAsIndex ? @@ -632,7 +651,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im // since this is an indication of whether or not there are // more values after the // ones returned - hasMore = delegate.nextRaw(results); + hasMore = (scannerContext == null) + ? delegate.nextRaw(results) + : delegate.nextRaw(results, scannerContext); if (!results.isEmpty()) { if (isDummy(results)) { return getDummyResult(resultsToReturn); @@ -779,8 +800,18 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im } } + @Override + public boolean nextRaw(List<Cell> results, ScannerContext scannerContext) + throws IOException { + return next(results, scannerContext); + } + @Override public boolean next(List<Cell> results) throws IOException { + return next(results, null); + } + @Override + public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException { boolean hasMore; boolean atLimit; boolean aggBoundary = false; @@ -807,7 +838,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im // since this is an indication of whether or not there // are more values after the // ones returned - hasMore = delegate.nextRaw(kvs); + hasMore = (scannerContext == null) + ? delegate.nextRaw(kvs) + : delegate.nextRaw(kvs, scannerContext); if (!kvs.isEmpty()) { if (isDummy(kvs)) { updateDummyWithPrevRowKey(results, initStartRowKey, diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 487eb86aba..6a2b3459da 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -297,10 +297,26 @@ public class HashJoinRegionScanner implements RegionScanner { @Override public boolean nextRaw(List<Cell> result) throws IOException { + return next(result, true, null); + } + + @Override + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { + return next(result, true, scannerContext); + } + + + private boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext) throws IOException { try { long startTime = EnvironmentEdgeManager.currentTimeMillis(); while (shouldAdvance()) { - hasMore = scanner.nextRaw(result); + if (scannerContext != null) { + hasMore = raw + ? scanner.nextRaw(result, scannerContext) + : scanner.next(result, scannerContext); + } else { + hasMore = raw ? scanner.nextRaw(result) : scanner.next(result); + } if (isDummy(result)) { return true; } @@ -325,12 +341,6 @@ public class HashJoinRegionScanner implements RegionScanner { } } - @Override - public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) - throws IOException { - throw new IOException("Next with scannerContext should not be called in Phoenix environment"); - } - @Override public boolean reseek(byte[] row) throws IOException { return scanner.reseek(row); @@ -343,12 +353,12 @@ public class HashJoinRegionScanner implements RegionScanner { @Override public boolean next(List<Cell> result) throws IOException { - throw new IOException("Next should not be used in HashJoin scanner"); + return next(result, false, null); } @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - throw new IOException("Next with scannerContext should not be called in Phoenix environment"); + return next(result, false, scannerContext); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java index 9b7402149f..960a1a8cfb 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java @@ -34,6 +34,7 @@ import java.util.TreeMap; import java.util.TreeSet; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.Cell; @@ -429,4 +430,8 @@ public class IndexRebuildRegionScanner extends GlobalIndexRegionScanner { results.add(aggKeyValue); return hasMore || hasMoreIncr; } + + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return next(result); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java index 19707bd533..bc87704e29 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java @@ -34,6 +34,7 @@ import java.util.TreeMap; import java.util.TreeSet; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -479,4 +480,8 @@ public class IndexRepairRegionScanner extends GlobalIndexRegionScanner { results.add(aggKeyValue); return hasMore || hasMoreIncr; } + + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return next(result); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java index 42e51d6606..0bb4249581 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; @@ -477,4 +478,8 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner { public long getMaxResultSize() { return scan.getMaxResultSize(); } + + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return next(result); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java index 719ce43731..66aa9598a8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.filter.PagingFilter; import org.apache.phoenix.filter.SkipScanFilter; @@ -130,12 +131,20 @@ public class PagingRegionScanner extends BaseRegionScanner { private boolean hasMore() { return lookupPosition < pointLookupRanges.size(); } - private boolean next(List<Cell> results, boolean raw, RegionScanner scanner) - throws IOException { + private boolean next(List<Cell> results, boolean raw, RegionScanner scanner, + ScannerContext scannerContext) throws IOException { try { long startTime = EnvironmentEdgeManager.currentTimeMillis(); while (true) { - if (raw ? scanner.nextRaw(results) : scanner.next(results)) { + boolean hasMore; + if (scannerContext != null) { + hasMore = raw + ? scanner.nextRaw(results, scannerContext) + : scanner.next(results, scannerContext); + } else { + hasMore = raw ? scanner.nextRaw(results) : scanner.next(results); + } + if (hasMore) { // Since each scan is supposed to return only one row (even when the // start and stop row key are not the same, which happens after region // moves or when there are delete markers in the table), this should not @@ -197,7 +206,8 @@ public class PagingRegionScanner extends BaseRegionScanner { initialized = true; } - private boolean next(List<Cell> results, boolean raw) throws IOException { + private boolean next(List<Cell> results, boolean raw, ScannerContext scannerContext) + throws IOException { init(); if (pagingFilter != null) { pagingFilter.init(); @@ -230,19 +240,26 @@ public class PagingRegionScanner extends BaseRegionScanner { } else { if (multiKeyPointLookup != null) { - RegionScanner regionScanner = multiKeyPointLookup.getNewScanner(); - if (regionScanner == null) { - return false; - } - delegate.close(); - delegate = regionScanner; + RegionScanner regionScanner = multiKeyPointLookup.getNewScanner(); + if (regionScanner == null) { + return false; + } + delegate.close(); + delegate = regionScanner; } } if (multiKeyPointLookup != null) { - return multiKeyPointLookup.next(results, raw, delegate); + return multiKeyPointLookup.next(results, raw, delegate, scannerContext); + } + boolean hasMore; + if (scannerContext != null) { + hasMore = raw + ? delegate.nextRaw(results, scannerContext) + : delegate.next(results, scannerContext); + } else { + hasMore = raw ? delegate.nextRaw(results) : delegate.next(results); } - boolean hasMore = raw ? delegate.nextRaw(results) : delegate.next(results); if (pagingFilter == null) { return hasMore; } @@ -268,12 +285,22 @@ public class PagingRegionScanner extends BaseRegionScanner { @Override public boolean next(List<Cell> results) throws IOException { - return next(results, false); + return next(results, false, null); } @Override public boolean nextRaw(List<Cell> results) throws IOException { - return next(results, true); + return next(results, true, null); + } + + @Override + public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException { + return next(results, false, scannerContext); + } + + @Override + public boolean nextRaw(List<Cell> results, ScannerContext scannerContext) throws IOException { + return next(results, true, scannerContext); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java index cc5d416513..810d965120 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java @@ -176,14 +176,12 @@ public class PhoenixTTLRegionObserver extends BaseScannerRegionObserver implemen @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - throw new IOException( - "next with scannerContext should not be called in Phoenix environment"); + return next(result); } @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { - throw new IOException( - "NextRaw with scannerContext should not be called in Phoenix environment"); + return nextRaw(result); } @Override public void close() throws IOException { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java index 57fcc60c44..6a7c5c58fa 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.PageFilter; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.EnvironmentEdgeManager; @@ -210,15 +211,32 @@ public class TTLRegionScanner extends BaseRegionScanner { return false; } - private boolean next(List<Cell> result, boolean raw) throws IOException { + private boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext) + throws IOException { + boolean hasMore; if (!isMaskingEnabled) { - return raw ? delegate.nextRaw(result) : delegate.next(result); + if (scannerContext != null) { + hasMore = raw + ? delegate.nextRaw(result, scannerContext) + : delegate.next(result, scannerContext); + } else { + hasMore = raw ? delegate.nextRaw(result) : delegate.next(result); + } + return hasMore; } if (!initialized) { init(); initialized = true; } - boolean hasMore = raw ? delegate.nextRaw(result) : delegate.next(result); + + if (scannerContext != null) { + hasMore = raw + ? delegate.nextRaw(result, scannerContext) + : delegate.next(result, scannerContext); + } else { + hasMore = raw ? delegate.nextRaw(result) : delegate.next(result); + } + if (result.isEmpty() || ScanUtil.isDummy(result)) { return hasMore; } @@ -235,12 +253,22 @@ public class TTLRegionScanner extends BaseRegionScanner { @Override public boolean next(List<Cell> results) throws IOException { - return next(results, false); + return next(results, false, null); } @Override public boolean nextRaw(List<Cell> results) throws IOException { - return next(results, true); + return next(results, true, null); + } + + @Override + public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException { + return next(results, false, scannerContext); + } + + @Override + public boolean nextRaw(List<Cell> results, ScannerContext scannerContext) throws IOException { + return next(results, true, scannerContext); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java index ec61489af7..691b9265ba 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.execute.TupleProjector; @@ -359,6 +360,10 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner { } } + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return next(result); + } + /** * A page of index rows are scanned and then their corresponding data table rows are retrieved * from the data table regions in parallel. These data rows are then joined with index rows. diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java index a69b5a2e89..39b8733a0e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.IndexMaintainer; @@ -129,4 +130,8 @@ public class UncoveredLocalIndexRegionScanner extends UncoveredIndexRegionScanne ServerIndexUtil.wrapResultUsingOffset(result, offset); return hasMore; } + + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return next(result); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index b0b1d4196d..986c7209ab 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -66,6 +66,8 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -750,6 +752,13 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats, final Region region, final Scan scan, Configuration config) throws IOException { + ScannerContext groupScannerContext; + if (scan.isScanMetricsEnabled()) { + groupScannerContext = ScannerContext.newBuilder() + .setTrackMetrics(scan.isScanMetricsEnabled()).build(); + } else { + groupScannerContext = null; + } StatsCollectionCallable callable = new StatsCollectionCallable(stats, region, innerScanner, config, scan); byte[] asyncBytes = scan.getAttribute(BaseScannerRegionObserverConstants.RUN_UPDATE_STATS_ASYNC_ATTRIB); @@ -799,6 +808,21 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } + @Override + public boolean next(List<Cell> results, ScannerContext scannerContext) + throws IOException { + if (groupScannerContext != null && scannerContext != null) { + ScannerContextUtil.updateMetrics(groupScannerContext, scannerContext); + } + return next(results); + } + + @Override + public boolean nextRaw(List<Cell> results, ScannerContext scannerContext) + throws IOException { + return next(results, scannerContext); + } + @Override public boolean next(List<Cell> results) throws IOException { results.add(aggKeyValue); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java index 65f76c78e7..dc574c015e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.GlobalCache; @@ -560,8 +561,20 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { } } + @Override + public boolean nextRaw(List<Cell> results, ScannerContext scannerContext) + throws IOException { + return next(results, scannerContext); + } + @Override public boolean next(List<Cell> resultsToReturn) throws IOException { + return next(resultsToReturn, null); + } + + @Override + public boolean next(List<Cell> resultsToReturn, ScannerContext scannerContext) + throws IOException { boolean hasMore; long startTime = EnvironmentEdgeManager.currentTimeMillis(); Configuration conf = env.getConfiguration(); @@ -589,7 +602,9 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { // Results are potentially returned even when the return value of s.next is false // since this is an indication of whether or not there are more values after the // ones returned - hasMore = innerScanner.nextRaw(results); + hasMore = (scannerContext == null) + ? innerScanner.nextRaw(results) + : innerScanner.nextRaw(results, scannerContext); if (isDummy(results)) { if (!hasAny) { resultsToReturn.addAll(results); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index 4dbf5c1f6d..62b8677103 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -252,7 +252,8 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver implements Reg return true; } - public boolean next(List<Cell> result, boolean raw) throws IOException { + public boolean next(List<Cell> result, boolean raw, ScannerContext scannerContext) + throws IOException { try { if (!initialized) { init(); @@ -261,9 +262,13 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver implements Reg long startTime = EnvironmentEdgeManager.currentTimeMillis(); do { if (raw) { - hasMore = scanner.nextRaw(result); + hasMore = (scannerContext == null) + ? scanner.nextRaw(result) + : scanner.nextRaw(result, scannerContext); } else { - hasMore = scanner.next(result); + hasMore = (scannerContext == null) + ? scanner.next(result) + : scanner.next(result, scannerContext); } if (result.isEmpty()) { return hasMore; @@ -298,22 +303,22 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver implements Reg @Override public boolean next(List<Cell> result) throws IOException { - return next(result, false); + return next(result, false, null); } @Override public boolean nextRaw(List<Cell> result) throws IOException { - return next(result, true); + return next(result, true, null); } @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - throw new IOException("next with scannerContext should not be called in Phoenix environment"); + return next(result, false, scannerContext); } @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { - throw new IOException("NextRaw with scannerContext should not be called in Phoenix environment"); + return next(result, true, scannerContext); } @Override diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java index 12873c9081..ddb04a9c82 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.cache.GlobalCache; @@ -172,19 +174,21 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { if (scanOffset != null) { final boolean isIncompatibleClient = ScanUtil.isIncompatibleClientForServerReturnValidRowKey(scan); + RegionScannerResultIterator iterator = new RegionScannerResultIterator(scan, + innerScanner, + getMinMaxQualifiersFromScan(scan), + encodingScheme); + ScannerContext sc = iterator.getRegionScannerContext(); innerScanner = getOffsetScanner( innerScanner, new OffsetResultIterator( - new RegionScannerResultIterator( - innerScanner, - getMinMaxQualifiersFromScan(scan), - encodingScheme), + iterator, scanOffset, getPageSizeMsForRegionScanner(scan), isIncompatibleClient), scan.getAttribute(QueryConstants.LAST_SCAN) != null, isIncompatibleClient, - scan); + scan, sc); } boolean spoolingEnabled = env.getConfiguration().getBoolean( @@ -194,21 +198,22 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { env.getConfiguration() .getLongBytes(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES); - final OrderedResultIterator iterator = - deserializeFromScan(scan, innerScanner, spoolingEnabled, thresholdBytes); + OrderedResultIteratorWithScannerContext ic + = deserializeFromScan(scan, innerScanner, spoolingEnabled, thresholdBytes); + final OrderedResultIterator iterator = ic.getIterator(); if (iterator == null) { return innerScanner; } // TODO:the above wrapped scanner should be used here also - return getTopNScanner(env, innerScanner, iterator, tenantId); + return getTopNScanner(env, innerScanner, iterator, tenantId, ic.getScannerContext()); } @VisibleForTesting - static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner s, - boolean spoolingEnabled, long thresholdBytes) { + static OrderedResultIteratorWithScannerContext deserializeFromScan(Scan scan, RegionScanner s, + boolean spoolingEnabled, long thresholdBytes) { byte[] topN = scan.getAttribute(BaseScannerRegionObserverConstants.TOPN); if (topN == null) { - return null; + return new OrderedResultIteratorWithScannerContext(null, null); } int clientVersion = ScanUtil.getClientVersion(scan); // Client including and after 4.15 and 5.1 are not going to serialize thresholdBytes @@ -239,11 +244,14 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { } PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); - ResultIterator inner = new RegionScannerResultIterator(s, + RegionScannerResultIterator inner = new RegionScannerResultIterator(scan, s, EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme); - return new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled, + OrderedResultIterator iterator + = new OrderedResultIterator(inner, orderByExpressions, spoolingEnabled, thresholdBytes, limit >= 0 ? limit : null, null, estimatedRowSize, getPageSizeMsForRegionScanner(scan), scan, s.getRegionInfo()); + return new OrderedResultIteratorWithScannerContext(inner.getRegionScannerContext(), + iterator); } catch (IOException e) { throw new RuntimeException(e); } finally { @@ -255,6 +263,24 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { } } + private static class OrderedResultIteratorWithScannerContext { + private ScannerContext scannerContext; + private OrderedResultIterator iterator; + + OrderedResultIteratorWithScannerContext(ScannerContext sc, OrderedResultIterator ori) { + this.scannerContext = sc; + this.iterator = ori; + } + + public ScannerContext getScannerContext() { + return scannerContext; + } + + public OrderedResultIterator getIterator() { + return iterator; + } + } + private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s, Set<KeyValueColumnExpression> arrayKVRefs) { byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserverConstants.SPECIFIC_ARRAY_INDEX); @@ -295,7 +321,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { final OffsetResultIterator iterator, final boolean isLastScan, final boolean incompatibleClient, - final Scan scan) + final Scan scan, final ScannerContext sc) throws IOException { final Tuple firstTuple; final Region region = getRegion(); @@ -372,6 +398,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { return new BaseRegionScanner(s) { private Tuple tuple = firstTuple; private byte[] previousResultRowKey; + private ScannerContext regionScannerContext = sc; @Override public boolean isFilterDone() { @@ -380,6 +407,12 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { @Override public boolean next(List<Cell> results) throws IOException { + return next(results, null); + } + + @Override + public boolean next(List<Cell> results, ScannerContext scannerContext) + throws IOException { try { if (isFilterDone()) { return false; @@ -413,6 +446,10 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { } } tuple = nextTuple; + if (regionScannerContext != null) { + ScannerContextUtil.updateMetrics(regionScannerContext, scannerContext); + regionScannerContext = null; + } return !isFilterDone(); } catch (Throwable t) { LOGGER.error("Error while iterating Offset scanner.", t); @@ -421,6 +458,12 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { } } + @Override + public boolean nextRaw(List<Cell> results, ScannerContext scannerContext) + throws IOException { + return next(results, scannerContext); + } + @Override public void close() throws IOException { try { @@ -475,7 +518,9 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { * since after this everything is held in memory */ private RegionScanner getTopNScanner(RegionCoprocessorEnvironment env, final RegionScanner s, - final OrderedResultIterator iterator, ImmutableBytesPtr tenantId) throws Throwable { + final OrderedResultIterator iterator, + ImmutableBytesPtr tenantId, ScannerContext sc) + throws Throwable { final Tuple firstTuple; TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId); @@ -498,6 +543,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { } return new BaseRegionScanner(s) { private Tuple tuple = firstTuple; + private ScannerContext regionScannerContext = sc; @Override public boolean isFilterDone() { @@ -506,6 +552,12 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { @Override public boolean next(List<Cell> results) throws IOException { + return next(results, null); + } + + @Override + public boolean next(List<Cell> results, ScannerContext scannerContext) + throws IOException { try { if (isFilterDone()) { return false; @@ -518,6 +570,10 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { } } tuple = iterator.next(); + if (regionScannerContext != null) { + ScannerContextUtil.updateMetrics(regionScannerContext, scannerContext); + regionScannerContext = null; + } return !isFilterDone(); } catch (Throwable t) { ClientUtil.throwIOException(region.getRegionInfo().getRegionNameAsString(), t); @@ -525,6 +581,12 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { } } + @Override + public boolean nextRaw(List<Cell> results, ScannerContext scannerContext) + throws IOException { + return next(results, scannerContext); + } + @Override public void close() throws IOException { try { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index 112bb5f63e..1b93d7f82f 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -209,8 +209,15 @@ public abstract class RegionScannerFactory { @Override public boolean next(List<Cell> results) throws IOException { + return next(results, null); + } + + @Override + public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException { try { - boolean next = s.next(results); + boolean next = (scannerContext == null) + ? s.next(results) + : s.next(results, scannerContext); if (ScanUtil.isDummy(results)) { return true; } @@ -221,10 +228,6 @@ public abstract class RegionScannerFactory { } } - @Override - public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - throw new IOException("Next with scannerContext should not be called in Phoenix environment"); - } @Override public void close() throws IOException { @@ -253,8 +256,15 @@ public abstract class RegionScannerFactory { @Override public boolean nextRaw(List<Cell> result) throws IOException { + return nextRaw(result, null); + } + + @Override + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { try { - boolean next = s.nextRaw(result); + boolean next = (scannerContext == null) + ? s.nextRaw(result) + : s.nextRaw(result, scannerContext); if (ScanUtil.isDummy(result)) { return true; } @@ -309,6 +319,10 @@ public abstract class RegionScannerFactory { return false; } // There is a scanattribute set to retrieve the specific array element + if (scannerContext != null) { + ScannerContextUtil.incrementSizeProgress(scannerContext, result); + ScannerContextUtil.updateTimeProgress(scannerContext); + } return next; } catch (Throwable t) { ClientUtil.throwIOException(getRegion().getRegionInfo().getRegionNameAsString(), t); @@ -369,13 +383,6 @@ public abstract class RegionScannerFactory { return new Pair<>(tuple, new byte[0]); } - @Override - public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) - throws IOException { - boolean res = next(result); - ScannerContextUtil.incrementSizeProgress(scannerContext, result); - return res; - } /** * When there is a merge in progress while scanning local indexes we might get the key values less than scan start row. diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java index b05696e758..d186ef881e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java @@ -24,7 +24,9 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; @@ -43,12 +45,21 @@ public class RegionScannerResultIterator extends BaseResultIterator { private final Pair<Integer, Integer> minMaxQualifiers; private final boolean useQualifierAsIndex; private final QualifierEncodingScheme encodingScheme; + private final ScannerContext regionScannerContext; - public RegionScannerResultIterator(RegionScanner scanner, Pair<Integer, Integer> minMaxQualifiers, QualifierEncodingScheme encodingScheme) { + public RegionScannerResultIterator(Scan scan, RegionScanner scanner, + Pair<Integer, Integer> minMaxQualifiers, + QualifierEncodingScheme encodingScheme) { this.scanner = scanner; this.useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers); this.minMaxQualifiers = minMaxQualifiers; this.encodingScheme = encodingScheme; + if (scan.isScanMetricsEnabled()) { + regionScannerContext = ScannerContext.newBuilder() + .setTrackMetrics(scan.isScanMetricsEnabled()).build(); + } else { + regionScannerContext = null; + } } @Override @@ -62,7 +73,12 @@ public class RegionScannerResultIterator extends BaseResultIterator { // Results are potentially returned even when the return value of s.next is false // since this is an indication of whether or not there are more values after the // ones returned - boolean hasMore = scanner.nextRaw(results); + boolean hasMore; + if (regionScannerContext == null) { + hasMore = scanner.nextRaw(results); + } else { + hasMore = scanner.nextRaw(results, regionScannerContext); + } if (!hasMore && results.isEmpty()) { return null; @@ -81,6 +97,9 @@ public class RegionScannerResultIterator extends BaseResultIterator { } } + public ScannerContext getRegionScannerContext() { + return regionScannerContext; + } @Override public String toString() { return "RegionScannerResultIterator [scanner=" + scanner + "]"; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java index b82ac71454..f9fb3fc9e4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java @@ -37,6 +37,7 @@ import java.util.Properties; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.types.PDate; @@ -427,6 +428,34 @@ public class ServerPagingIT extends ParallelStatsDisabledIT { } } + @Test + public void testNumberOfRPCsWithPaging() throws SQLException { + // insert 200 rows + Connection conn = DriverManager.getConnection(getUrl()); + String tableName = generateUniqueName(); + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + stmt.execute("CREATE TABLE " + tableName + + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)"); + for (int i = 1; i <= 200; i++) { + String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName, i, i); + stmt.execute(sql); + } + conn.commit(); + + // delete every alternate row + for (int i=1; i<=200; i=i+2) { + stmt.execute("DELETE FROM " + tableName + " WHERE A = " + i); + conn.commit(); + } + + ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); + while (rs.next()) { + } + Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + long numRpc = getMetricValue(metrics, MetricType.COUNT_RPC_CALLS); + Assert.assertEquals(101, numRpc); + } + private void populateTable(String tableName) throws Exception { Connection conn = DriverManager.getConnection(getUrl()); conn.createStatement().execute("create table " + tableName + @@ -438,4 +467,15 @@ public class ServerPagingIT extends ParallelStatsDisabledIT { conn.commit(); conn.close(); } + + private long getMetricValue(Map<String, Map<MetricType, Long>> metrics, MetricType type) { + long result = 0; + for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) { + Long val = entry.getValue().get(type); + if (val != null) { + result += val.longValue(); + } + } + return result; + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java new file mode 100644 index 0000000000..d074d4a483 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/CountRowsScannedIT.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.monitoring; + +import static org.junit.Assert.assertEquals; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Map; + +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.jdbc.PhoenixResultSet; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(NeedsOwnMiniClusterTest.class) +public class CountRowsScannedIT extends BaseTest { + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true"); + // force many rpc calls + props.put(QueryServices.SCAN_CACHE_SIZE_ATTRIB, "10"); + setUpTestDriver(new ReadOnlyProps(props)); + } + + @Test + public void testSinglePrimaryKey() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + String tableName = generateUniqueName(); + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + stmt.execute("CREATE TABLE " + tableName + + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)"); + for (int i = 1; i <= 100; i++) { + String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName, i, i); + stmt.execute(sql); + } + conn.commit(); + + // both columns, but primary key 3 to 100, 98 rows + long count1 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE A >= 3 AND Z >= 7"); + assertEquals(98, count1); + + // primary key, 3 to 100, 98 rows + long count2 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE A >= 3"); + assertEquals(98, count2); + + // non-primary key, all rows + long count3 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE Z >= 7"); + assertEquals(100, count3); + + // primary key with limit, the first row + long count4 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE A >= 3 limit 1"); + assertEquals(1, count4); + + // non-primary key with limit, find the first Z >= 7, 1 to 7, 7 rows + long count5 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 limit 1"); + assertEquals(7, count5); + + // primary key with order by primary and limit + long count6 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE A >= 3 ORDER BY A limit 1"); + assertEquals(1, count6); + + // primary key with order by non-primary and limit + long count7 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE A >= 3 ORDER BY Z limit 1"); + assertEquals(98, count7); + + // select non-primary key with order by primary limit + long count8 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 ORDER BY A limit 1"); + assertEquals(7, count8); + + // select non-primary key with order by primary limit desc + // scan from the last, 1 row + long count9 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 ORDER BY A desc limit 1"); + assertEquals(1, count9); + + // select non-primary key with order by primary limit desc + // scan from the last, 1 row + long count10 = countRowsScannedFromSql(stmt, "SELECT A,Z FROM " + tableName + + " WHERE Z >= 7 AND Z <= 60 ORDER BY A desc limit 1"); + assertEquals(41, count10); + + // select non-primary key with order by primary limit + long count11 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE Z >= 7 ORDER BY Z limit 1"); + assertEquals(100, count11); + + // skip scan + long count12 = countRowsScannedFromSql(stmt, + "SELECT A,Z FROM " + tableName + " WHERE A in (20, 45, 68, 3)"); + assertEquals(4, count12); + } + + @Test + public void testMultiPrimaryKeys() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + String tableName = generateUniqueName(); + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName + + " (A UNSIGNED_LONG NOT NULL, B UNSIGNED_LONG NOT NULL, " + + " Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (A, B))"); + for (int i = 1; i <= 100; i++) { + String sql = String + .format("UPSERT INTO %s VALUES (%d, %d, %d)", tableName, (i % 5) + 1, i, i); + stmt.execute(sql); + } + conn.commit(); + + // pk1 and pk2, only needed rows are scanned + long count1 = countRowsScannedFromSql(stmt, + "SELECT A,B,Z FROM " + tableName + " WHERE A >= 2 AND B >= 3"); + assertEquals(79, count1); + + // pk2, all rows + long count2 = countRowsScannedFromSql(stmt, + "SELECT A,B,Z FROM " + tableName + " WHERE B >= 3"); + assertEquals(100, count2); + + // non-pk, all rows + long count3 = countRowsScannedFromSql(stmt, + "SELECT A,B,Z FROM " + tableName + " WHERE Z >= 7"); + assertEquals(100, count3); + + // non group aggregate, pk2 only, all rows + long count4 = countRowsScannedFromSql(stmt, "SELECT SUM(A) FROM " + tableName + + " WHERE B >= 3"); + assertEquals(100, count4); + + // pk1 and pk2, group by + long count5 = countRowsScannedFromSql(stmt, "SELECT B, SUM(A), SUM(Z) FROM " + tableName + + " WHERE A >= 2 AND B >= 3 GROUP BY B"); + assertEquals(79, count5); + + // pk1 and pk2, group by, ordered + long count6 = countRowsScannedFromSql(stmt, "SELECT B, SUM(A), SUM(Z) FROM " + tableName + + " WHERE A >= 2 AND B >= 3 GROUP BY B ORDER BY B DESC"); + assertEquals(79, count6); + } + + @Test + public void testQueryWithDeleteMarkers() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + String tableName = generateUniqueName(); + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + stmt.execute("CREATE TABLE " + tableName + + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)"); + for (int i = 1; i <= 100; i++) { + String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName, i, i); + stmt.execute(sql); + } + conn.commit(); + String selectQuery = "SELECT A,Z FROM " + tableName + " LIMIT 1"; + for (int i=10; i<=100; i=i+10) { + stmt.execute("DELETE FROM " + tableName + " WHERE A < " + i); + conn.commit(); + long count = countRowsScannedFromSql(stmt, selectQuery); + assertEquals(i, count); + } + } + + @Test + public void testQueryIndex() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + stmt.execute("CREATE TABLE " + tableName + + " (A UNSIGNED_LONG NOT NULL PRIMARY KEY, Z UNSIGNED_LONG)"); + stmt.execute("CREATE INDEX " + indexName + " ON " + tableName + "(Z) INCLUDE (A)"); + for (int i = 1; i <= 100; i++) { + String sql = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName, i, i); + stmt.execute(sql); + } + conn.commit(); + String selectQuery = "SELECT A FROM " + tableName + " WHERE Z > 49 AND Z < 71"; + long count = countRowsScannedFromSql(stmt, selectQuery); + assertEquals(21, count); + Assert.assertEquals(indexName, stmt.getQueryPlan().getTableRef().getTable().getTableName().toString()); + } + + @Test + public void testJoin() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + String tableName1 = generateUniqueName(); + String tableName2 = generateUniqueName(); + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName1 + + " (A UNSIGNED_LONG NOT NULL, B UNSIGNED_LONG NOT NULL, " + + " Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (A, B))"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName2 + + " (A UNSIGNED_LONG NOT NULL, B UNSIGNED_LONG NOT NULL, " + + " Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (A, B))"); + // table1.B = table2.A + for (int i = 1; i <= 100; i++) { + // table1.B in [51, 150], table2.A in [1, 100] + String sql1 = String + .format("UPSERT INTO %s VALUES (%d, %d, %d)", tableName1, i, i + 50, i); + stmt.execute(sql1); + String sql2 = String.format("UPSERT INTO %s VALUES (%d, %d, %d)", tableName2, i, i, i); + stmt.execute(sql2); + } + + conn.commit(); + + // table1 + long count1 = countRowsScannedFromSql(stmt, + "SELECT * FROM " + tableName1 + " WHERE A >= 40"); + assertEquals(61, count1); + + // table2, all rows + long count2 = countRowsScannedFromSql(stmt, + "SELECT * FROM " + tableName2 + " WHERE B >= 20"); + assertEquals(100, count2); + + // join + String sqlJoin = "SELECT X.K, X.VX, Y.VY FROM ( SELECT B AS K, A AS VX FROM " + tableName1 + + " WHERE A >= 40) X JOIN (SELECT A AS K, B AS VY FROM " + tableName2 + + " WHERE B >= 20) Y ON X.K=Y.K"; + long count3 = countRowsScannedFromSql(stmt, sqlJoin); + assertEquals(161, count3); + } + + @Test + public void testUnionAll() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + String tableName1 = generateUniqueName(); + String tableName2 = generateUniqueName(); + PhoenixStatement stmt = conn.createStatement().unwrap(PhoenixStatement.class); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName1 + + " (A UNSIGNED_LONG NOT NULL, Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (A))"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS " + tableName2 + + " (B UNSIGNED_LONG NOT NULL, Z UNSIGNED_LONG, CONSTRAINT pk PRIMARY KEY (B))"); + for (int i = 1; i <= 100; i++) { + String sql1 = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName1, i, i); + stmt.execute(sql1); + String sql2 = String.format("UPSERT INTO %s VALUES (%d, %d)", tableName2, i, i); + stmt.execute(sql2); + } + + conn.commit(); + + // table1 + long count1 = countRowsScannedFromSql(stmt, + "SELECT A, Z FROM " + tableName1 + " WHERE A >= 40"); + assertEquals(61, count1); + + // table2, all rows + long count2 = countRowsScannedFromSql(stmt, + "SELECT B, Z FROM " + tableName2 + " WHERE B >= 20"); + assertEquals(81, count2); + + // union all + String sqlUnionAll = "SELECT SUM(Z) FROM ( SELECT Z FROM " + tableName1 + + " WHERE A >= 40 UNION ALL SELECT Z FROM " + tableName2 + " WHERE B >= 20)"; + long count3 = countRowsScannedFromSql(stmt, sqlUnionAll); + assertEquals(142, count3); + + // union all then group by + String sqlUnionAllGroupBy = "SELECT K, SUM(Z) FROM ( SELECT A AS K, Z FROM " + tableName1 + + " WHERE A >= 40 UNION ALL SELECT B AS K, Z FROM " + tableName2 + + " WHERE B >= 20) GROUP BY K"; + long count4 = countRowsScannedFromSql(stmt, sqlUnionAllGroupBy); + assertEquals(142, count4); + } + + private long countRowsScannedFromSql(Statement stmt, String sql) throws SQLException { + ResultSet rs = stmt.executeQuery(sql); + while (rs.next()) { + // loop to the end + } + return getRowsScanned(rs); + } + + private long getRowsScanned(ResultSet rs) throws SQLException { + if (!(rs instanceof PhoenixResultSet)) { + return -1; + } + Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + + long sum = 0; + boolean valid = false; + for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) { + Long val = entry.getValue().get(MetricType.COUNT_ROWS_SCANNED); + if (val != null) { + sum += val.longValue(); + valid = true; + } + } + if (valid) { + return sum; + } else { + return -1; + } + } +} \ No newline at end of file diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java index 0dfc13e385..eaf4674b67 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/NonTxIndexBuilderTest.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.BaseRegionScanner; @@ -173,6 +174,10 @@ public class NonTxIndexBuilderTest extends BaseConnectionlessQueryTest { } return false; // indicate no more results } + + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return next(result); + } }; }