Repository: phoenix Updated Branches: refs/heads/master cec2340d0 -> f9420e6fb
PHOENIX-3040 Don't use guideposts for executing queries serially Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f9420e6f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f9420e6f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f9420e6f Branch: refs/heads/master Commit: f9420e6fb8d635572a7049527db0cc513dbeebe6 Parents: cec2340 Author: Samarth <[email protected]> Authored: Thu Jun 30 22:13:07 2016 -0700 Committer: Samarth <[email protected]> Committed: Thu Jun 30 22:13:07 2016 -0700 ---------------------------------------------------------------------- .../phoenix/compile/ListJarsQueryPlan.java | 5 ++ .../org/apache/phoenix/compile/QueryPlan.java | 1 + .../apache/phoenix/compile/TraceQueryPlan.java | 5 ++ .../apache/phoenix/execute/AggregatePlan.java | 22 ++++--- .../phoenix/execute/DegenerateQueryPlan.java | 64 -------------------- .../phoenix/execute/DelegateQueryPlan.java | 5 ++ .../execute/LiteralResultIterationPlan.java | 5 ++ .../org/apache/phoenix/execute/ScanPlan.java | 58 ++++++++++++------ .../phoenix/execute/SortMergeJoinPlan.java | 5 ++ .../org/apache/phoenix/execute/UnionPlan.java | 9 +++ .../phoenix/iterate/BaseResultIterators.java | 30 +++------ .../apache/phoenix/jdbc/PhoenixStatement.java | 5 ++ .../org/apache/phoenix/query/QueryServices.java | 3 +- .../phoenix/query/QueryServicesOptions.java | 1 + .../query/ParallelIteratorsSplitTest.java | 5 ++ 15 files changed, 109 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java index b52e704..e3025cc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ListJarsQueryPlan.java @@ -251,4 +251,9 @@ public class ListJarsQueryPlan implements QueryPlan { public Operation getOperation() { return stmt.getUpdateOperation(); } + + @Override + public boolean isSerial() { + return true; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java index 7722483..f403e34 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryPlan.java @@ -87,4 +87,5 @@ public interface QueryPlan extends StatementPlan { */ public boolean useRoundRobinIterator() throws SQLException; + public boolean isSerial(); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java index 5e0977b..ed11b46 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/TraceQueryPlan.java @@ -259,4 +259,9 @@ public class TraceQueryPlan implements QueryPlan { public boolean useRoundRobinIterator() { return false; } + + @Override + public boolean isSerial() { + return true; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 82d854b..c439618 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -61,9 +61,7 @@ import org.apache.phoenix.parse.HintNode; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.types.PInteger; import org.apache.phoenix.util.ScanUtil; @@ -83,6 +81,7 @@ public class AggregatePlan extends BaseQueryPlan { private List<KeyRange> splits; private List<List<Scan>> scans; private static final Logger logger = LoggerFactory.getLogger(AggregatePlan.class); + private boolean isSerial; public AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, @@ -100,6 +99,12 @@ public class AggregatePlan extends BaseQueryPlan { orderBy, groupBy, parallelIteratorFactory, dynamicFilter); this.having = having; this.aggregators = context.getAggregationManager().getAggregators(); + boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL); + boolean canBeExecutedSerially = ScanUtil.canQueryBeExecutedSerially(table.getTable(), orderBy, context); + if (hasSerialHint && !canBeExecutedSerially) { + logger.warn("This query cannot be executed serially. Ignoring the hint"); + } + this.isSerial = hasSerialHint && canBeExecutedSerially; } public Expression getHaving() { @@ -207,13 +212,7 @@ public class AggregatePlan extends BaseQueryPlan { PInteger.INSTANCE.toBytes(limit + (offset == null ? 0 : offset))); } } - PTable table = tableRef.getTable(); - boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL); - boolean canBeExecutedSerially = ScanUtil.canQueryBeExecutedSerially(table, orderBy, context); - if (hasSerialHint && !canBeExecutedSerially) { - logger.warn("This query cannot be executed serially. Ignoring the hint"); - } - BaseResultIterators iterators = hasSerialHint && canBeExecutedSerially + BaseResultIterators iterators = isSerial ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan) : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false); @@ -266,4 +265,9 @@ public class AggregatePlan extends BaseQueryPlan { public boolean useRoundRobinIterator() throws SQLException { return false; } + + @Override + public boolean isSerial() { + return isSerial; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java deleted file mode 100644 index 5887ff3..0000000 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.execute; - -import java.sql.SQLException; -import java.util.Collections; -import java.util.List; - -import org.apache.hadoop.hbase.client.Scan; -import org.apache.phoenix.compile.GroupByCompiler.GroupBy; -import org.apache.phoenix.compile.OrderByCompiler.OrderBy; -import org.apache.phoenix.compile.RowProjector; -import org.apache.phoenix.compile.ScanRanges; -import org.apache.phoenix.compile.StatementContext; -import org.apache.phoenix.iterate.ParallelScanGrouper; -import org.apache.phoenix.iterate.ResultIterator; -import org.apache.phoenix.jdbc.PhoenixParameterMetaData; -import org.apache.phoenix.parse.FilterableStatement; -import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.schema.TableRef; - -public class DegenerateQueryPlan extends BaseQueryPlan { - - public DegenerateQueryPlan(StatementContext context, FilterableStatement statement, TableRef table) { - super(context, statement, table, RowProjector.EMPTY_PROJECTOR, PhoenixParameterMetaData.EMPTY_PARAMETER_META_DATA, null,null, OrderBy.EMPTY_ORDER_BY, GroupBy.EMPTY_GROUP_BY, null, null); - context.setScanRanges(ScanRanges.NOTHING); - } - - @Override - public List<KeyRange> getSplits() { - return Collections.emptyList(); - } - - @Override - public List<List<Scan>> getScans() { - return Collections.emptyList(); - } - - @Override - protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan scan) throws SQLException { - return null; - } - - @Override - public boolean useRoundRobinIterator() { - return false; - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java index 8f0d224..f282aea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateQueryPlan.java @@ -124,4 +124,9 @@ public abstract class DelegateQueryPlan implements QueryPlan { public Integer getOffset() { return delegate.getOffset(); } + + @Override + public boolean isSerial() { + return delegate.isSerial(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index db99964..38cb65e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@ -111,4 +111,9 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { return scanner; } + @Override + public boolean isSerial() { + return true; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 31354be..34354f3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -82,6 +82,8 @@ public class ScanPlan extends BaseQueryPlan { private List<KeyRange> splits; private List<List<Scan>> scans; private boolean allowPageFilter; + private boolean isSerial; + private boolean isDataToScanWithinThreshold; public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException { this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null); @@ -92,11 +94,16 @@ public class ScanPlan extends BaseQueryPlan { parallelIteratorFactory != null ? parallelIteratorFactory : buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter); this.allowPageFilter = allowPageFilter; - if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN + boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); + if (isOrdered) { // TopN int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); ScanRegionObserver.serializeIntoScan(context.getScan(), thresholdBytes, limit == null ? -1 : limit, orderBy.getOrderByExpressions(), projector.getEstimatedRowByteSize()); } + Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit; + perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset); + this.isDataToScanWithinThreshold = isAmountOfDataToScanWithinThreshold(context, table.getTable(), perScanLimit); + this.isSerial = isSerial(context, statement, tableRef, orderBy, isDataToScanWithinThreshold); } private static boolean isSerial(StatementContext context, FilterableStatement statement, @@ -118,23 +125,32 @@ public class ScanPlan extends BaseQueryPlan { private static boolean isAmountOfDataToScanWithinThreshold(StatementContext context, PTable table, Integer perScanLimit) throws SQLException { Scan scan = context.getScan(); - /* - * If a limit is not provided or if we have a filter, then we are not able to decide whether - * the amount of data we need to scan is less than the threshold. - */ + ConnectionQueryServices services = context.getConnection().getQueryServices(); + long estRowSize = SchemaUtil.estimateRowSize(table); + long regionSize = services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE, + HConstants.DEFAULT_MAX_FILE_SIZE); if (perScanLimit == null || scan.getFilter() != null) { + /* + * If a limit is not provided or if we have a filter, then we are not able to decide whether + * the amount of data we need to scan is less than the threshold. + */ return false; + } else if (perScanLimit != null && scan.getFilter() == null) { + /* + * In presence of a limit and in absence of a filter, we are not relying on guide post info to + * see if we are beyond a threshold. + */ + float factor = + services.getProps().getFloat(QueryServices.NONFILTERED_AND_LIMITED_QUERY_SERIAL_THRESHOLD, + QueryServicesOptions.DEFAULT_NONFILTERED_LIMITED_QUERY_SERIAL_THRESHOLD); + return Float.compare(estRowSize * perScanLimit, factor * regionSize) < 0; } long scn = context.getConnection().getSCN() == null ? Long.MAX_VALUE : context.getConnection().getSCN(); PTableStats tableStats = context.getConnection().getQueryServices().getTableStats(table.getName().getBytes(), scn); GuidePostsInfo gpsInfo = tableStats.getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table)); - ConnectionQueryServices services = context.getConnection().getQueryServices(); - long estRowSize; - long estimatedParallelThresholdBytes; + long threshold; if (gpsInfo == null || gpsInfo.getGuidePostsCount() == 0) { - estRowSize = SchemaUtil.estimateRowSize(table); - estimatedParallelThresholdBytes = services.getProps().getLong(HConstants.HREGION_MAX_FILESIZE, - HConstants.DEFAULT_MAX_FILE_SIZE); + threshold = regionSize; } else { long totByteSize = 0; long totRowCount = 0; @@ -145,13 +161,13 @@ public class ScanPlan extends BaseQueryPlan { totRowCount += rowCount; } estRowSize = totByteSize / totRowCount; - estimatedParallelThresholdBytes = 2 + threshold = 2 * services.getProps().getLong(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_STATS_GUIDEPOST_WIDTH_BYTES); } - long limitThreshold = services.getProps().getLong(QueryServices.QUERY_PARALLEL_LIMIT_THRESHOLD, - estimatedParallelThresholdBytes); - return (perScanLimit * estRowSize < limitThreshold); + long thresholdToUse = services.getProps().getLong(QueryServices.FILTERED_OR_NONLIMITED_QUERY_SERIAL_THRESHOLD, + threshold); + return (perScanLimit * estRowSize < thresholdToUse); } private static ParallelIteratorFactory buildResultIteratorFactory(StatementContext context, FilterableStatement statement, @@ -211,10 +227,7 @@ public class ScanPlan extends BaseQueryPlan { * limit is provided, run query serially. */ boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); - Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit; - perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset); - boolean isDataWithinThreshold = isAmountOfDataToScanWithinThreshold(context, table, perScanLimit); - boolean isSerial = isSerial(context, statement, tableRef, orderBy, isDataWithinThreshold); + Integer perScanLimit = QueryUtil.getOffsetLimit(!allowPageFilter || isOrdered ? null : limit, offset); boolean isOffsetOnServer = isOffsetPossibleOnServer(context, orderBy, offset, isSalted, table.getIndexType()); /* * For queries that are doing a row key order by and are not possibly querying more than a @@ -223,7 +236,7 @@ public class ScanPlan extends BaseQueryPlan { */ boolean initFirstScanOnly = (orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY || orderBy == OrderBy.REV_ROW_KEY_ORDER_BY) - && isDataWithinThreshold; + && isDataToScanWithinThreshold; BaseResultIterators iterators; if (isOffsetOnServer) { iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan); @@ -280,4 +293,9 @@ public class ScanPlan extends BaseQueryPlan { return ScanUtil.isRoundRobinPossible(orderBy, context); } + @Override + public boolean isSerial() { + return isSerial; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index 8e0e6e2..a8ad3eb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -672,4 +672,9 @@ public class SortMergeJoinPlan implements QueryPlan { return tableRefs; } + @Override + public boolean isSerial() { + return false; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java index cf95b5b..f60f81f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java @@ -231,4 +231,13 @@ public class UnionPlan implements QueryPlan { } return sources; } + + @Override + public boolean isSerial() { + boolean isSerial = true; + for (QueryPlan plan : getPlans()) { + isSerial &= plan.isSerial(); + } + return isSerial; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 7796a17..e3d512f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -142,15 +142,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } private boolean useStats() { - boolean isPointLookup = context.getScanRanges().isPointLookup(); /* - * Don't use guide posts if: - * 1) We're doing a point lookup, as HBase is fast enough at those - * to not need them to be further parallelized. TODO: perf test to verify - * 2) We're collecting stats, as in this case we need to scan entire - * regions worth of data to track where to put the guide posts. + * Don't use guide posts: + * 1) If we're collecting stats, as in this case we need to scan entire + * regions worth of data to track where to put the guide posts. + * 2) If the query is going to be executed serially. */ - if (isPointLookup || ScanUtil.isAnalyzeTable(scan)) { + if (ScanUtil.isAnalyzeTable(scan) || plan.isSerial()) { return false; } return true; @@ -423,11 +421,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } private GuidePostsInfo getGuidePosts(Set<byte[]> whereConditions) { - /* - * Don't use guide posts if: 1) We're doing a point lookup, as HBase is fast enough at those to not need them to - * be further parallelized. TODO: pref test to verify 2) We're collecting stats, as in this case we need to scan - * entire regions worth of data to track where to put the guide posts. - */ if (!useStats()) { return GuidePostsInfo.NO_GUIDEPOST; } GuidePostsInfo gps = null; @@ -629,7 +622,6 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } catch (EOFException e) {} } byte[] currentKeyBytes = currentKey.copyBytes(); - // Merge bisect with guideposts for all but the last region while (regionIndex <= stopIndex) { HRegionLocation regionLocation = regionLocations.get(regionIndex); @@ -649,11 +641,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result while (guideIndex < gpsSize && (currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) { Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, currentGuidePostBytes, keyOffset, false); - if(newScan != null) { + if (newScan != null) { ScanUtil.setLocalIndexAttributes(newScan, keyOffset, regionInfo.getStartKey(), regionInfo.getEndKey(), newScan.getStartRow(), newScan.getStopRow()); - } - if (newScan != null) { estimatedRows += gps.getRowCounts().get(guideIndex); estimatedSize += gps.getByteCounts().get(guideIndex); } @@ -673,12 +663,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result currentKeyBytes = endKey; regionIndex++; } - if (hasGuidePosts) { - this.estimatedRows = estimatedRows; - this.estimatedSize = estimatedSize; - } else if (scanRanges.isPointLookup()) { + if (scanRanges.isPointLookup()) { this.estimatedRows = 1L; this.estimatedSize = SchemaUtil.estimateRowSize(table); + } else if (hasGuidePosts) { + this.estimatedRows = estimatedRows; + this.estimatedSize = estimatedSize; } else { this.estimatedRows = null; this.estimatedSize = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 0c154e2..02303d1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -573,6 +573,11 @@ public class PhoenixStatement implements Statement, SQLCloseable { public boolean useRoundRobinIterator() throws SQLException { return false; } + + @Override + public boolean isSerial() { + return true; + } }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index e255e61..c2acec1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -214,10 +214,11 @@ public interface QueryServices extends SQLCloseable { public static final String HCONNECTION_POOL_CORE_SIZE = "hbase.hconnection.threads.core"; public static final String HCONNECTION_POOL_MAX_SIZE = "hbase.hconnection.threads.max"; public static final String HTABLE_MAX_THREADS = "hbase.htable.threads.max"; - public static final String QUERY_PARALLEL_LIMIT_THRESHOLD = "phoenix.query.parallelThresholdBytes"; // time to wait before running second index population upsert select (so that any pending batches of rows on region server are also written to index) public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time"; public static final String LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB = "phoenix.client.localIndexUpgrade"; + public static final String NONFILTERED_AND_LIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.nonfiltered.limited.query.serial.threshold"; + public static final String FILTERED_OR_NONLIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.filtered.limited.query.serial.threshold"; /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index cb646a1..57b60c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -248,6 +248,7 @@ public class QueryServicesOptions { (3 * DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD) / 4; public static final int DEFAULT_RENEW_LEASE_THREAD_POOL_SIZE = 10; public static final boolean DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE = true; + public static final float DEFAULT_NONFILTERED_LIMITED_QUERY_SERIAL_THRESHOLD = 0.2f; @SuppressWarnings("serial") public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>() { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f9420e6f/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index 05fbf81..8c65937 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -471,6 +471,11 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { public boolean useRoundRobinIterator() { return false; } + + @Override + public boolean isSerial() { + return true; + } }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false); List<KeyRange> keyRanges = parallelIterators.getSplits();
