This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 796c25fc57e5bd37dadd7f79fb79044e56d18c36 Author: Steve Carlin <[email protected]> AuthorDate: Mon Jan 27 09:49:20 2025 -0800 IMPALA-13716: Calcite Planner: TupleIsNullPredicate fix for analytic functions There is some special logic to materialize the TupleIsNullPredicate functions that are created by join nodes for outer joins for analytic functions. This commit refactors some of the code in the current Impala planner and materializes them with the Analytic RelNode. An example query from the test framework that causes this issue is: select avg(g) over (order by f) af3 from alltypestiny t1 left outer join (select id as a, coalesce(bigint_col, 30) as f, bigint_col as g from alltypestiny) t2 on (t1.id = t2.a); Change-Id: Iaec363c2fa93a1e21bf74a40e5399e21ddd9bd60 Reviewed-on: http://gerrit.cloudera.org:8080/22411 Reviewed-by: Aman Sinha <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../impala/analysis/TupleIsNullPredicate.java | 17 +++++++++ .../org/apache/impala/planner/AnalyticPlanner.java | 42 +++++++++++----------- .../apache/impala/planner/SingleNodePlanner.java | 11 +++++- .../impala/calcite/rel/node/ImpalaAnalyticRel.java | 13 ++++++- 4 files changed, 61 insertions(+), 22 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java b/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java index 915ab71f4..b66bce87c 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java +++ b/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java @@ -213,6 +213,23 @@ public class TupleIsNullPredicate extends Predicate { return expr; } + /** + * Given a list of expressions and a list of TupleIds, extract all the + * TupleIsNullPredicate expressions that are bound to any of the tuple Ids + * passed in. + */ + public static List<TupleIsNullPredicate> getUniqueBoundTupleIsNullPredicates( + List<Expr> exprs, List<TupleId> tids) { + + List<TupleIsNullPredicate> tupleIsNullPreds = new ArrayList<>(); + for (Expr expr : exprs) { + if (!expr.isBoundByTupleIds(tids)) continue; + expr.collect(TupleIsNullPredicate.class, tupleIsNullPreds); + } + Expr.removeDuplicates(tupleIsNullPreds); + return tupleIsNullPreds; + } + @Override public Expr clone() { return new TupleIsNullPredicate(this); } diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java index 12f3c854f..c6f54101e 100644 --- a/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/AnalyticPlanner.java @@ -94,6 +94,8 @@ public class AnalyticPlanner { * hash partitioning during the parallelization of 'root'. * Any unassigned conjuncts from 'analyzer_' are applied after analytic functions are * evaluated. + * A list of TupleIsNullPredicates is passed in as required by SortInfo (see comment + * in createSortInfo where the expressions are materialized). * TODO: when generating sort orders for the sort groups, optimize the ordering * of the partition exprs (so that subsequent sort operations see the input sorted * on a prefix of their required sort exprs) @@ -101,7 +103,8 @@ public class AnalyticPlanner { * (using the equivalence classes) rather than looking for expr equality */ public PlanNode createSingleNodePlan(PlanNode root, - List<Expr> groupingExprs, List<Expr> inputPartitionExprs) throws ImpalaException { + List<Expr> groupingExprs, List<Expr> inputPartitionExprs, + List<TupleIsNullPredicate> tupleIsNullPreds) throws ImpalaException { // If the plan node is not an EmptySetNode, identify predicates that reference the // logical analytic tuple (this logical analytic tuple is replaced by different // physical ones during planning) @@ -135,14 +138,17 @@ public class AnalyticPlanner { partitionGroups, groupingExprs, root.getNumInstances(), inputPartitionExprs); } + List<TupleIsNullPredicate> emptyPreds = new ArrayList<>(); for (int i = 0; i < partitionGroups.size(); ++i) { PartitionGroup partitionGroup = partitionGroups.get(i); for (int j = 0; j < partitionGroup.sortGroups.size(); ++j) { + boolean firstSortGroup = (i == 0) && (j == 0); boolean lastSortGroup = (i == partitionGroups.size() - 1) && (j == partitionGroup.sortGroups.size() - 1); root = createSortGroupPlan(root, partitionGroup.sortGroups.get(j), j == 0 ? partitionGroup.partitionByExprs : null, - lastSortGroup ? perPartitionLimits : null); + lastSortGroup ? perPartitionLimits : null, + firstSortGroup ? tupleIsNullPreds : emptyPreds); } } @@ -316,15 +322,19 @@ public class AnalyticPlanner { * on sortExprs. */ private SortInfo createSortInfo(PlanNode input, List<Expr> sortExprs, - List<Boolean> isAsc, List<Boolean> nullsFirst) { - return createSortInfo(input, sortExprs, isAsc, nullsFirst, TSortingOrder.LEXICAL); + List<Boolean> isAsc, List<Boolean> nullsFirst, + List<TupleIsNullPredicate> tupleIsNullPreds) { + return createSortInfo(input, sortExprs, isAsc, nullsFirst, tupleIsNullPreds, + TSortingOrder.LEXICAL); } /** * Same as above, but with extra parameter, sorting order. */ private SortInfo createSortInfo(PlanNode input, List<Expr> sortExprs, - List<Boolean> isAsc, List<Boolean> nullsFirst, TSortingOrder sortingOrder) { + List<Boolean> isAsc, List<Boolean> nullsFirst, + List<TupleIsNullPredicate> tupleIsNullPreds, + TSortingOrder sortingOrder) { List<Expr> inputSlotRefs = new ArrayList<>(); for (TupleId tid: input.getTupleIds()) { TupleDescriptor tupleDesc = analyzer_.getTupleDesc(tid); @@ -360,23 +370,13 @@ public class AnalyticPlanner { sortingOrder); sortInfo.createSortTupleInfo(inputSlotRefs, analyzer_); - // Lhs exprs to be substituted in ancestor plan nodes could have a rhs that contains - // TupleIsNullPredicates. TupleIsNullPredicates require specific tuple ids for + // TupleIsNullPredicates require specific tuple ids for // evaluation. Since this sort materializes a new tuple, it's impossible to evaluate // TupleIsNullPredicates referring to this sort's input after this sort, // To preserve the information whether an input tuple was null or not this sort node, - // we materialize those rhs TupleIsNullPredicates, which are then substituted + // we materialize those TupleIsNullPredicates, which are then substituted // by a SlotRef into the sort's tuple in ancestor nodes (IMPALA-1519). - if (inputSmap != null) { - List<TupleIsNullPredicate> tupleIsNullPreds = new ArrayList<>(); - for (Expr rhsExpr: inputSmap.getRhs()) { - // Ignore substitutions that are irrelevant at this plan node and its ancestors. - if (!rhsExpr.isBoundByTupleIds(input.getTupleIds())) continue; - rhsExpr.collect(TupleIsNullPredicate.class, tupleIsNullPreds); - } - Expr.removeDuplicates(tupleIsNullPreds); - sortInfo.addMaterializedExprs(tupleIsNullPreds, analyzer_); - } + sortInfo.addMaterializedExprs(tupleIsNullPreds, analyzer_); sortInfo.getSortTupleDescriptor().materializeSlots(); return sortInfo; } @@ -391,7 +391,8 @@ public class AnalyticPlanner { * Any placed limits have 'pushed' set to true. */ private PlanNode createSortGroupPlan(PlanNode root, SortGroup sortGroup, - List<Expr> partitionExprs, List<PartitionLimit> perPartitionLimits) + List<Expr> partitionExprs, List<PartitionLimit> perPartitionLimits, + List<TupleIsNullPredicate> tupleIsNullPreds) throws ImpalaException { List<Expr> partitionByExprs = sortGroup.partitionByExprs; List<OrderByElement> orderByElements = sortGroup.orderByElements; @@ -426,7 +427,8 @@ public class AnalyticPlanner { } } - SortInfo sortInfo = createSortInfo(root, sortExprs, isAsc, nullsFirst); + SortInfo sortInfo = createSortInfo(root, sortExprs, isAsc, nullsFirst, + tupleIsNullPreds); // IMPALA-8533: Avoid generating sort with empty tuple descriptor if(sortInfo.getSortTupleDescriptor().getSlots().size() > 0) { // Select the lowest limit to push into the sort. Other limit conjuncts will diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 0a5ad5986..c73b9b85f 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -296,7 +296,8 @@ public class SingleNodePlanner { List<Expr> inputPartitionExprs = new ArrayList<>(); root = analyticPlanner.createSingleNodePlan( - root, groupingExprs, inputPartitionExprs); + root, groupingExprs, inputPartitionExprs, + getTupleIsNullPreds(root)); if (multiAggInfo != null && !inputPartitionExprs.isEmpty() && multiAggInfo.getMaterializedAggClasses().size() == 1) { // analytic computation will benefit from a partition on inputPartitionExprs @@ -2359,4 +2360,12 @@ public class SingleNodePlanner { } return result; } + + private List<TupleIsNullPredicate> getTupleIsNullPreds(PlanNode planNode) { + if (planNode.getOutputSmap() == null) { + return new ArrayList<>(); + } + return TupleIsNullPredicate.getUniqueBoundTupleIsNullPredicates( + planNode.getOutputSmap().getRhs(), planNode.getTupleIds()); + } } diff --git a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java index f348c1299..50e191009 100644 --- a/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java +++ b/java/calcite-planner/src/main/java/org/apache/impala/calcite/rel/node/ImpalaAnalyticRel.java @@ -60,6 +60,7 @@ import org.apache.impala.analysis.FunctionParams; import org.apache.impala.analysis.OrderByElement; import org.apache.impala.analysis.SlotDescriptor; import org.apache.impala.analysis.SlotRef; +import org.apache.impala.analysis.TupleIsNullPredicate; import org.apache.impala.calcite.functions.AnalyzedAnalyticExpr; import org.apache.impala.calcite.functions.AnalyzedFunctionCallExpr; import org.apache.impala.calcite.functions.FunctionResolver; @@ -83,6 +84,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,8 +144,17 @@ public class ImpalaAnalyticRel extends Project AnalyticPlanner analyticPlanner = new AnalyticPlanner(analyticInfo, context.ctx_.getRootAnalyzer(), context.ctx_); + // TODO: IMPALA-12961. strip out null exprs which can exist when the tablescan does + // not output all of its columns. + List<Expr> nonNullExprs = inputNodeWithExprs.outputExprs_.stream() + .filter(e -> e != null).collect(Collectors.toList()); + List<TupleIsNullPredicate> tupleIsNullPreds = + TupleIsNullPredicate.getUniqueBoundTupleIsNullPredicates(nonNullExprs, + inputNodeWithExprs.planNode_.getTupleIds()); + PlanNode planNode = analyticPlanner.createSingleNodePlan( - inputNodeWithExprs.planNode_, Collections.emptyList(), new ArrayList<>()); + inputNodeWithExprs.planNode_, Collections.emptyList(), new ArrayList<>(), + tupleIsNullPreds); // Get a mapping of all expressions to its corresponding Impala Expr object. The // non-analytic expressions will have a RexInputRef type RexNode, while the
