IMPALA-5004: Switch to sorting node for large TopN queries Adds a new query option 'topn_bytes_limit' that places a limit on the number of estimated bytes that a TopN operator can process. If the Impala planner estimates that a TopN operator will process more bytes than this limit, it will replace the TopN operator with a sort operator.
Since the TopN operator cannot spill to disk, it has to buffer everything in memory. This can cause frequent OOM issues when running with a large limit + offset. Switching to a sort operator allows Impala to spill to disk. We prefer to use the TopN operator when possible as it has better performance than the sort operator for 'order by limit [offset]' queries. The default limit is set to 512MB and is based on micro-benchmarking the topn vs. sort operator for various limits (see the JIRA for full details). The default is set to an intentionally high value in order to avoid performance regressions. Testing: * Added a new planner test to fuctional-planner/ to validate that 'topn_bytes_limit' properly switches between topn and sort operators. Change-Id: I34c9db33c9302b55e9978f53f9c7061f2806c8a9 Reviewed-on: http://gerrit.cloudera.org:8080/11698 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Tim Armstrong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3a7e3828 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3a7e3828 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3a7e3828 Branch: refs/heads/branch-3.1.0 Commit: 3a7e382805d1d19a3a9c7c69c7ff1adce9677675 Parents: 030f0ac Author: stakiar <[email protected]> Authored: Thu Oct 4 16:50:53 2018 -0500 Committer: Zoltan Borok-Nagy <[email protected]> Committed: Tue Nov 13 12:51:39 2018 +0100 ---------------------------------------------------------------------- be/src/service/query-options-test.cc | 1 + be/src/service/query-options.cc | 6 ++ be/src/service/query-options.h | 3 +- common/thrift/ImpalaInternalService.thrift | 4 ++ common/thrift/ImpalaService.thrift | 6 ++ .../org/apache/impala/analysis/SortInfo.java | 14 ++++ .../apache/impala/planner/AggregationNode.java | 2 +- .../apache/impala/planner/AnalyticEvalNode.java | 2 +- .../impala/planner/DataSourceScanNode.java | 2 +- .../impala/planner/DistributedPlanner.java | 2 +- .../apache/impala/planner/HBaseScanNode.java | 2 +- .../org/apache/impala/planner/HdfsScanNode.java | 2 +- .../org/apache/impala/planner/JoinNode.java | 2 +- .../org/apache/impala/planner/KuduScanNode.java | 2 +- .../org/apache/impala/planner/PlanNode.java | 12 ++-- .../org/apache/impala/planner/SelectNode.java | 2 +- .../impala/planner/SingleNodePlanner.java | 54 +++++++++++---- .../org/apache/impala/planner/SortNode.java | 7 +- .../org/apache/impala/planner/SubplanNode.java | 2 +- .../org/apache/impala/planner/UnionNode.java | 2 +- .../org/apache/impala/planner/UnnestNode.java | 2 +- .../org/apache/impala/planner/PlannerTest.java | 16 ++++- .../PlannerTest/topn-bytes-limit-small.test | 72 ++++++++++++++++++++ .../queries/PlannerTest/topn-bytes-limit.test | 23 +++++++ .../QueryTest/spilling-no-debug-action.test | 1 + 25 files changed, 205 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/be/src/service/query-options-test.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index 114d830..059b355 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -144,6 +144,7 @@ TEST(QueryOptions, SetByteOptions) { {MAKE_OPTIONDEF(compute_stats_min_sample_size), {-1, I64_MAX}}, {MAKE_OPTIONDEF(max_mem_estimate_for_admission), {-1, I64_MAX}}, {MAKE_OPTIONDEF(scan_bytes_limit), {-1, I64_MAX}}, + {MAKE_OPTIONDEF(topn_bytes_limit), {-1, I64_MAX}}, }; vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{ {MAKE_OPTIONDEF(runtime_filter_min_size), http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/be/src/service/query-options.cc ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 1424896..bb49762 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -714,6 +714,12 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_cpu_limit_s(cpu_limit_s); break; } + case TImpalaQueryOptions::TOPN_BYTES_LIMIT: { + int64_t topn_bytes_limit; + RETURN_IF_ERROR(ParseMemValue(value, "topn bytes limit", &topn_bytes_limit)); + query_options->__set_topn_bytes_limit(topn_bytes_limit); + break; + } default: if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/be/src/service/query-options.h ---------------------------------------------------------------------- diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 46cdf05..95263f7 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // the DCHECK. #define QUERY_OPTS_TABLE\ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\ - TImpalaQueryOptions::CPU_LIMIT_S + 1);\ + TImpalaQueryOptions::TOPN_BYTES_LIMIT + 1);\ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\ @@ -143,6 +143,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> QUERY_OPT_FN(scan_bytes_limit, SCAN_BYTES_LIMIT,\ TQueryOptionLevel::ADVANCED)\ QUERY_OPT_FN(cpu_limit_s, CPU_LIMIT_S, TQueryOptionLevel::DEVELOPMENT)\ + QUERY_OPT_FN(topn_bytes_limit, TOPN_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\ ; /// Enforce practical limits on some query options to avoid undesired query state. http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 7222dae..47f43e2 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -304,6 +304,10 @@ struct TQueryOptions { // See comment in ImpalaService.thrift. 72: optional i64 cpu_limit_s = 0; + + // See comment in ImpalaService.thrift + // The default value is set to 512MB based on empirical data + 73: optional i64 topn_bytes_limit = 536870912; } // Impala currently has two types of sessions: Beeswax and HiveServer2 http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/common/thrift/ImpalaService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 05a1431..758617d 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -339,6 +339,12 @@ enum TImpalaQueryOptions { // Note that until IMPALA-7318 is fixed, CPU usage can be very stale and this may not // terminate queries soon enough. CPU_LIMIT_S, + + // The max number of estimated bytes a TopN operator is allowed to materialize, if the + // planner thinks a TopN operator will exceed this limit, it falls back to a TotalSort + // operator which is capable of spilling to disk (unlike the TopN operator which keeps + // everything in memory). 0 or -1 means this has no effect. + TOPN_BYTES_LIMIT, } // The summary of a DML statement. http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/analysis/SortInfo.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java index fba7286..8472725 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SortInfo.java +++ b/fe/src/main/java/org/apache/impala/analysis/SortInfo.java @@ -16,11 +16,13 @@ // under the License. package org.apache.impala.analysis; + import java.util.Collection; import java.util.List; import java.util.Set; import org.apache.impala.common.TreeNode; +import org.apache.impala.planner.PlanNode; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; @@ -233,6 +235,18 @@ public class SortInfo { } /** + * Estimates the size of the data materialized in memory by the TopN operator. The + * method uses the formula <code>estimatedSize = estimated # of rows in memory * + * average tuple serialized size</code>. 'cardinality' is the cardinality of the TopN + * operator and 'offset' is the value in the 'OFFSET [x]' clause. + */ + public long estimateTopNMaterializedSize(long cardinality, long offset) { + getSortTupleDescriptor().computeMemLayout(); + return (long) Math.ceil(getSortTupleDescriptor().getAvgSerializedSize() + * (PlanNode.checkedAdd(cardinality, offset))); + } + + /** * Returns the subset of 'sortExprs_' that should be materialized. A sort expr is * is materialized if it: * - contains a non-deterministic expr http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/AggregationNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java index ab234e4..72694ca 100644 --- a/fe/src/main/java/org/apache/impala/planner/AggregationNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AggregationNode.java @@ -249,7 +249,7 @@ public class AggregationNode extends PlanNode { cardinality_ = Math.min(getChild(0).getCardinality(), cardinality_); } } - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); } /** http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java index 5ca666c..86e5166 100644 --- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java +++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java @@ -138,7 +138,7 @@ public class AnalyticEvalNode extends PlanNode { protected void computeStats(Analyzer analyzer) { super.computeStats(analyzer); cardinality_ = getChild(0).cardinality_; - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); } @Override http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java index 1ddb394..d4740e8 100644 --- a/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/DataSourceScanNode.java @@ -268,7 +268,7 @@ public class DataSourceScanNode extends ScanNode { cardinality_ = numRowsEstimate_; cardinality_ *= computeSelectivity(); cardinality_ = Math.max(1, cardinality_); - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); if (LOG.isTraceEnabled()) { LOG.trace("computeStats DataSourceScan: cardinality=" + Long.toString(cardinality_)); http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index ac3115b..05563b0 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -1054,7 +1054,7 @@ public class DistributedPlanner { Preconditions.checkState(node == childSortNode); if (hasLimit) { childSortNode.unsetLimit(); - childSortNode.setLimit(limit + offset); + childSortNode.setLimit(PlanNode.checkedAdd(limit, offset)); } childSortNode.setOffset(0); childSortNode.computeStats(ctx_.getRootAnalyzer()); http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java index 13ecb6a..1b60ad4 100644 --- a/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HBaseScanNode.java @@ -226,7 +226,7 @@ public class HBaseScanNode extends ScanNode { cardinality_ *= computeSelectivity(); cardinality_ = Math.max(1, cardinality_); - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); if (LOG.isTraceEnabled()) { LOG.trace("computeStats HbaseScan: cardinality=" + Long.toString(cardinality_)); } http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 0246d8c..c1ff092 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1054,7 +1054,7 @@ public class HdfsScanNode extends ScanNode { // IMPALA-2165: Avoid setting the cardinality to 0 after rounding. cardinality_ = Math.max(cardinality_, 1); } - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); if (LOG.isTraceEnabled()) { LOG.trace("HdfsScan: cardinality_=" + Long.toString(cardinality_)); } http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/JoinNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/JoinNode.java b/fe/src/main/java/org/apache/impala/planner/JoinNode.java index e8adcfa..cc50318 100644 --- a/fe/src/main/java/org/apache/impala/planner/JoinNode.java +++ b/fe/src/main/java/org/apache/impala/planner/JoinNode.java @@ -617,7 +617,7 @@ public abstract class JoinNode extends PlanNode { break; } } - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); Preconditions.checkState(hasValidStats()); if (LOG.isTraceEnabled()) { LOG.trace("stats Join: cardinality=" + Long.toString(cardinality_)); http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index adeaa72..fc1f371 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -265,7 +265,7 @@ public class KuduScanNode extends ScanNode { inputCardinality_ = cardinality_ = kuduTable_.getNumRows(); cardinality_ *= computeSelectivity(); cardinality_ = Math.min(Math.max(1, cardinality_), kuduTable_.getNumRows()); - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); if (LOG.isTraceEnabled()) { LOG.trace("computeStats KuduScan: cardinality=" + Long.toString(cardinality_)); } http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/PlanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 744aa09..7751eeb 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -521,17 +521,17 @@ abstract public class PlanNode extends TreeNode<PlanNode> { if (!children_.isEmpty()) numNodes_ = getChild(0).numNodes_; } - protected long capAtLimit(long cardinality) { + protected long capCardinalityAtLimit(long cardinality) { if (hasLimit()) { - if (cardinality == -1) { - return limit_; - } else { - return Math.min(cardinality, limit_); - } + return capCardinalityAtLimit(cardinality, limit_); } return cardinality; } + static long capCardinalityAtLimit(long cardinality, long limit) { + return cardinality == -1 ? limit : Math.min(cardinality, limit); + } + /** * Call computeMemLayout() for all materialized tuples. */ http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/SelectNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SelectNode.java b/fe/src/main/java/org/apache/impala/planner/SelectNode.java index 3ffc975..7b637e6 100644 --- a/fe/src/main/java/org/apache/impala/planner/SelectNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SelectNode.java @@ -75,7 +75,7 @@ public class SelectNode extends PlanNode { Math.round(((double) getChild(0).cardinality_) * computeSelectivity()); Preconditions.checkState(cardinality_ >= 0); } - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); if (LOG.isTraceEnabled()) { LOG.trace("stats Select: cardinality=" + Long.toString(cardinality_)); } http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 313597e..740cbfa 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -48,6 +48,7 @@ import org.apache.impala.analysis.SingularRowSrcTableRef; import org.apache.impala.analysis.SlotDescriptor; import org.apache.impala.analysis.SlotId; import org.apache.impala.analysis.SlotRef; +import org.apache.impala.analysis.SortInfo; import org.apache.impala.analysis.TableRef; import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.analysis.TupleId; @@ -294,20 +295,8 @@ public class SingleNodePlanner { } if (stmt.evaluateOrderBy() && sortHasMaterializedSlots) { - long limit = stmt.getLimit(); - // TODO: External sort could be used for very large limits - // not just unlimited order-by - boolean useTopN = stmt.hasLimit() && !disableTopN; - if (useTopN) { - root = SortNode.createTopNSortNode( - ctx_.getNextNodeId(), root, stmt.getSortInfo(), stmt.getOffset()); - } else { - root = SortNode.createTotalSortNode( - ctx_.getNextNodeId(), root, stmt.getSortInfo(), stmt.getOffset()); - } - Preconditions.checkState(root.hasValidStats()); - root.setLimit(limit); - root.init(analyzer); + root = createSortNode(analyzer, root, stmt.getSortInfo(), stmt.getLimit(), + stmt.getOffset(), stmt.hasLimit(), disableTopN); } else { root.setLimit(stmt.getLimit()); root.computeStats(analyzer); @@ -317,6 +306,43 @@ public class SingleNodePlanner { } /** + * Creates and initializes either a SortNode or a TopNNode depending on various + * heuristics and configuration parameters. + */ + private SortNode createSortNode(Analyzer analyzer, PlanNode root, SortInfo sortInfo, + long limit, long offset, boolean hasLimit, boolean disableTopN) + throws ImpalaException { + SortNode sortNode; + long topNBytesLimit = ctx_.getQueryOptions().topn_bytes_limit; + + if (hasLimit && !disableTopN) { + if (topNBytesLimit <= 0) { + sortNode = + SortNode.createTopNSortNode(ctx_.getNextNodeId(), root, sortInfo, offset); + } else { + long topNCardinality = PlanNode.capCardinalityAtLimit(root.cardinality_, limit); + long estimatedTopNMaterializedSize = + sortInfo.estimateTopNMaterializedSize(topNCardinality, offset); + + if (estimatedTopNMaterializedSize < topNBytesLimit) { + sortNode = + SortNode.createTopNSortNode(ctx_.getNextNodeId(), root, sortInfo, offset); + } else { + sortNode = + SortNode.createTotalSortNode(ctx_.getNextNodeId(), root, sortInfo, offset); + } + } + } else { + sortNode = + SortNode.createTotalSortNode(ctx_.getNextNodeId(), root, sortInfo, offset); + } + Preconditions.checkState(sortNode.hasValidStats()); + sortNode.setLimit(limit); + sortNode.init(analyzer); + return sortNode; + } + + /** * If there are unassigned conjuncts that are bound by tupleIds or if there are slot * equivalences for tupleIds that have not yet been enforced, returns a SelectNode on * top of root that evaluates those conjuncts; otherwise returns root unchanged. http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/SortNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java index ccef721..e23b933 100644 --- a/fe/src/main/java/org/apache/impala/planner/SortNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java @@ -164,7 +164,7 @@ public class SortNode extends PlanNode { @Override protected void computeStats(Analyzer analyzer) { super.computeStats(analyzer); - cardinality_ = capAtLimit(getChild(0).cardinality_); + cardinality_ = capCardinalityAtLimit(getChild(0).cardinality_); if (LOG.isTraceEnabled()) { LOG.trace("stats Sort: cardinality=" + Long.toString(cardinality_)); } @@ -257,9 +257,8 @@ public class SortNode extends PlanNode { public void computeNodeResourceProfile(TQueryOptions queryOptions) { Preconditions.checkState(hasValidStats()); if (type_ == TSortType.TOPN) { - long perInstanceMemEstimate = - (long) Math.ceil((cardinality_ + offset_) * avgRowSize_); - nodeResourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate); + nodeResourceProfile_ = ResourceProfile.noReservation( + getSortInfo().estimateTopNMaterializedSize(cardinality_, offset_)); return; } http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/SubplanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java index a22c397..5b1e323 100644 --- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java @@ -91,7 +91,7 @@ public class SubplanNode extends PlanNode { } else { cardinality_ = -1; } - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); } @Override http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/UnionNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/UnionNode.java b/fe/src/main/java/org/apache/impala/planner/UnionNode.java index a8cda0d..00a6d20 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnionNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnionNode.java @@ -124,7 +124,7 @@ public class UnionNode extends PlanNode { // are inline views (e.g. select 1 FROM (VALUES(1 x, 1 y)) a FULL OUTER JOIN // (VALUES(1 x, 1 y)) b ON (a.x = b.y)). We need to set the correct value. if (numNodes_ == -1) numNodes_ = 1; - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); if (LOG.isTraceEnabled()) { LOG.trace("stats Union: cardinality=" + Long.toString(cardinality_)); } http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/main/java/org/apache/impala/planner/UnnestNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java index 7e0a87e..857a949 100644 --- a/fe/src/main/java/org/apache/impala/planner/UnnestNode.java +++ b/fe/src/main/java/org/apache/impala/planner/UnnestNode.java @@ -70,7 +70,7 @@ public class UnnestNode extends PlanNode { // The containing SubplanNode has not yet been initialized, so get the number // of nodes from the SubplanNode's input. numNodes_ = containingSubplanNode_.getChild(0).getNumNodes(); - cardinality_ = capAtLimit(cardinality_); + cardinality_ = capCardinalityAtLimit(cardinality_); } @Override http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/fe/src/test/java/org/apache/impala/planner/PlannerTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index e8e4fb8..2ec4b15 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -193,7 +193,21 @@ public class PlannerTest extends PlannerTestBase { @Test public void testTopN() { - runPlannerTestFile("topn"); + TQueryOptions options = new TQueryOptions(); + options.setTopn_bytes_limit(0); + runPlannerTestFile("topn", options); + } + + @Test + public void testTopNBytesLimit() { + runPlannerTestFile("topn-bytes-limit"); + } + + @Test + public void testTopNBytesLimitSmall() { + TQueryOptions options = new TQueryOptions(); + options.setTopn_bytes_limit(6); + runPlannerTestFile("topn-bytes-limit-small", options); } @Test http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test new file mode 100644 index 0000000..db3e9a5 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit-small.test @@ -0,0 +1,72 @@ +# topn_bytes_limit is set to 6 so a limit of 1 will return a single int +# a single int is 4 bytes, which is under the limit of 6 so a TOP-N should be triggered +select int_col from functional.alltypes order by 1 limit 1 +---- PLAN +PLAN-ROOT SINK +| +01:TOP-N [LIMIT=1] +| order by: int_col ASC +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +02:MERGING-EXCHANGE [UNPARTITIONED] +| order by: int_col ASC +| limit: 1 +| +01:TOP-N [LIMIT=1] +| order by: int_col ASC +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# returns 2 ints, with a total size of 8 bytes, which exceeds the limit of 6 and thus triggers a SORT +select int_col from functional.alltypes order by 1 limit 2 +---- PLAN +PLAN-ROOT SINK +| +01:SORT [LIMIT=2] +| order by: int_col ASC +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +02:MERGING-EXCHANGE [UNPARTITIONED] +| order by: int_col ASC +| limit: 2 +| +01:SORT [LIMIT=2] +| order by: int_col ASC +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== +# test that offset is taken into account; the query only returns a single row but needs to sort two rows +# sorting two ints requires 8 bytes of memory, which exceeds the threshold of 6 +select int_col from functional.alltypes order by 1 limit 1 offset 1 +---- PLAN +PLAN-ROOT SINK +| +01:SORT [LIMIT=1 OFFSET=1] +| order by: int_col ASC +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +02:MERGING-EXCHANGE [UNPARTITIONED] +| offset: 1 +| order by: int_col ASC +| limit: 1 +| +01:SORT [LIMIT=2] +| order by: int_col ASC +| +00:SCAN HDFS [functional.alltypes] + partitions=24/24 files=24 size=478.45KB +==== http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test new file mode 100644 index 0000000..9ad000e --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/topn-bytes-limit.test @@ -0,0 +1,23 @@ +# check that topn is triggered for low limits with the default value of topn_bytes_limit +select id from functional.alltypestiny order by id limit 7 +---- PLAN +PLAN-ROOT SINK +| +01:TOP-N [LIMIT=7] +| order by: id ASC +| +00:SCAN HDFS [functional.alltypestiny] + partitions=4/4 files=4 size=460B +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +02:MERGING-EXCHANGE [UNPARTITIONED] +| order by: id ASC +| limit: 7 +| +01:TOP-N [LIMIT=7] +| order by: id ASC +| +00:SCAN HDFS [functional.alltypestiny] + partitions=4/4 files=4 size=460B +==== \ No newline at end of file http://git-wip-us.apache.org/repos/asf/impala/blob/3a7e3828/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test index 7fe1c96..c82ea17 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test @@ -79,6 +79,7 @@ Memory limit exceeded ---- QUERY # Top-N query with large limit that will OOM because spilling is not implemented: # IMPALA-3471. It does not need any help from DEBUG_ACTION. +set topn_bytes_limit=-1; set mem_limit=100m; select * from lineitem
