IMPALA-3167: Fix assignment of WHERE conjunct through grouping agg + OJ. Background: We generally allow the assignment of predicates below the nullable side of a left/right outer join, explained as follows using an example:
SELECT * FROM t1 LEFT OUTER JOIN t2 ON t1.id = t2.id WHERE t2.int_col < 10 The scan of 't2' picks up 't2.int_col < 10' via Analyzer.getBoundPredicates() and recognizes that the predicate must also be evaluated by a join later, so the predicate is not marked as assigned. The join then picks up the unassigned predicate via Analyzer.getUnassignedConjuncts(). The bug was that our logic for detecting whether a bound predicate must also be evaluated at a join node was flawed because it only considered whether the tuples of the source or destination predicate were outer joined (plus other conditions). The underlying assumption is that either the source or destination tuple are bound by a tuple produced by a TableRef, but in the buggy query the source predicate is bound by an aggregation tuple, so we incorrectly marked the bound predicate as assigned in Analyzer.getBoundPredicates(). The fix is to conservatively not mark bound predicates as assigned if the slots referenced by the predicate have equivalent slots that belong to an outer-joined tuple. As a result, a plan node may pick up the same predicate multiple times, once via Analyzer.getBoundPredicates() and another time via Analyzer.getUnassignedConjuncts(). Those are deduped now. The following example explains the duplicate predicate assignment: SELECT * FROM (SELECT * FROM t t1) a LEFT OUTER JOIN t b ON a.id = b.id WHERE a.id < 10 1. The predicate 'a.id < 10' gets migrated into the inline view. 'a.id < 10' is marked as assigned but is still registered as a single-tid conjunct in the Analyzer for potential propagation 2. The scan node of 't1' calls Analyzer.getBoundPredicates() and generates 't1.id < 10' based on the source predicate 'a.id < 10'. 3. The scan node of 't1' picks up the migrated conjunct 't1.id < 10' via Analyzer.getUnassignedConjuncts(). Change-Id: I774d13a13ad1e8fe82512df98dc29983bdd232eb Reviewed-on: http://gerrit.cloudera.org:8080/4960 Reviewed-by: Alex Behm <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f8377543 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f8377543 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f8377543 Branch: refs/heads/master Commit: f8377543778b654336c978a4bb97efa3c1847441 Parents: b656f57 Author: Alex Behm <[email protected]> Authored: Fri Nov 4 10:41:25 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Tue Dec 6 07:24:01 2016 +0000 ---------------------------------------------------------------------- .../org/apache/impala/analysis/Analyzer.java | 29 ++++++++++++-------- .../java/org/apache/impala/analysis/Expr.java | 19 ++----------- .../org/apache/impala/analysis/SelectStmt.java | 2 +- .../org/apache/impala/planner/HdfsScanNode.java | 2 +- .../impala/planner/SingleNodePlanner.java | 2 +- .../queries/PlannerTest/outer-joins.test | 29 ++++++++++++++++++++ 6 files changed, 52 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/fe/src/main/java/org/apache/impala/analysis/Analyzer.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 4819342..6bba436 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -1223,10 +1223,11 @@ public class Analyzer { } /** - * Returns true if e must be evaluated by a join node. Note that it may still be - * safe to evaluate e elsewhere as well, but in any case the join must evaluate e. + * Returns true if 'e' must be evaluated after or by a join node. Note that it may + * still be safe to evaluate 'e' elsewhere as well, but in any case 'e' must be + * evaluated again by or after a join. */ - public boolean evalByJoin(Expr e) { + public boolean evalAfterJoin(Expr e) { List<TupleId> tids = Lists.newArrayList(); e.getIds(tids, null); if (tids.isEmpty()) return false; @@ -1555,18 +1556,22 @@ public class Analyzer { } } - // Check if either srcConjunct or the generated predicate needs to be evaluated - // at a join node (IMPALA-2018). - boolean evalByJoin = - (evalByJoin(srcConjunct) - && (globalState_.ojClauseByConjunct.get(srcConjunct.getId()) - != globalState_.outerJoinedTupleIds.get(srcTid))) - || (evalByJoin(p) + // IMPALA-2018/4379: Check if srcConjunct or the generated predicate need to + // be evaluated again at a later point in the plan, e.g., by a join that makes + // referenced tuples nullable. The first condition is conservative but takes + // into account that On-clause conjuncts can sometimes be legitimately assigned + // below their originating join. + boolean evalAfterJoin = + (hasOuterJoinedTuple && !srcConjunct.isOnClauseConjunct_) + || (evalAfterJoin(srcConjunct) + && (globalState_.ojClauseByConjunct.get(srcConjunct.getId()) + != globalState_.outerJoinedTupleIds.get(srcTid))) + || (evalAfterJoin(p) && (globalState_.ojClauseByConjunct.get(p.getId()) - != globalState_.outerJoinedTupleIds.get(destTid))); + != globalState_.outerJoinedTupleIds.get(destTid))); // mark all bound predicates including duplicate ones - if (reverseValueTransfer && !evalByJoin) markConjunctAssigned(srcConjunct); + if (reverseValueTransfer && !evalAfterJoin) markConjunctAssigned(srcConjunct); } // check if we already created this predicate http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/fe/src/main/java/org/apache/impala/analysis/Expr.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java index 779e252..87a2a12 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Expr.java +++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java @@ -851,22 +851,9 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl */ public static <C extends Expr> void removeDuplicates(List<C> l) { if (l == null) return; - ListIterator<C> it1 = l.listIterator(); - while (it1.hasNext()) { - C e1 = it1.next(); - ListIterator<C> it2 = l.listIterator(); - boolean duplicate = false; - while (it2.hasNext()) { - C e2 = it2.next(); - // only check up to but excluding e1 - if (e1 == e2) break; - if (e1.equals(e2)) { - duplicate = true; - break; - } - } - if (duplicate) it1.remove(); - } + List<C> origList = Lists.newArrayList(l); + l.clear(); + for (C expr: origList) if (!l.contains(expr)) l.add(expr); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java index 80ffde5..74399d0 100644 --- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java @@ -333,7 +333,7 @@ public class SelectStmt extends QueryStmt { analyzer.getUnassignedConjuncts(getTableRefIds(), true); List<Expr> unassignedJoinConjuncts = Lists.newArrayList(); for (Expr e: unassigned) { - if (analyzer.evalByJoin(e)) unassignedJoinConjuncts.add(e); + if (analyzer.evalAfterJoin(e)) unassignedJoinConjuncts.add(e); } List<Expr> baseTblJoinConjuncts = Expr.substituteList(unassignedJoinConjuncts, baseTblSmap_, analyzer, false); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 9642b97..d2eff19 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -292,7 +292,7 @@ public class HdfsScanNode extends ScanNode { // Mark those conjuncts as assigned that do not also need to be evaluated by a // subsequent semi or outer join. for (Expr conjunct: collectionConjuncts) { - if (!analyzer.evalByJoin(conjunct)) analyzer.markConjunctAssigned(conjunct); + if (!analyzer.evalAfterJoin(conjunct)) analyzer.markConjunctAssigned(conjunct); } if (!collectionConjuncts.isEmpty()) { analyzer.materializeSlots(collectionConjuncts); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 4bc8a88..3687a17 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -1195,7 +1195,6 @@ public class SingleNodePlanner { */ private PlanNode createHdfsScanPlan(TableRef hdfsTblRef, boolean fastPartitionKeyScans, Analyzer analyzer) throws ImpalaException { - HdfsTable hdfsTable = (HdfsTable)hdfsTblRef.getTable(); TupleDescriptor tupleDesc = hdfsTblRef.getDesc(); // Get all predicates bound by the tuple. @@ -1208,6 +1207,7 @@ public class SingleNodePlanner { analyzer.markConjunctsAssigned(unassigned); analyzer.createEquivConjuncts(tupleDesc.getId(), conjuncts); + Expr.removeDuplicates(conjuncts); // Do partition pruning before deciding which slots to materialize, // We might end up removing some predicates. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f8377543/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test b/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test index 2d5d6cd..95d16f8 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/outer-joins.test @@ -919,3 +919,32 @@ PLAN-ROOT SINK partitions=24/24 files=24 size=478.45KB runtime filters: RF000 -> b.int_col ==== +# IMPALA-3167: Test correct assignment of a WHERE-clause predicate through an inline view +# that has a grouping aggregation and an outer join. The predicate can be assigned at the +# scan on the nullable side of the outer join, but it must also be evaluated after the join. +select v2.id, v2.s +from (select v1.id, sum(bigint_col) s + from functional.alltypes t1 + left outer join (select t2.int_col, t2.id + from functional.alltypessmall t2) v1 + on t1.int_col = v1.int_col + group by v1.id) v2 +where v2.id < 10 +---- PLAN +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: sum(bigint_col) +| group by: t2.id +| having: v1.id < 10 +| +02:HASH JOIN [LEFT OUTER JOIN] +| hash predicates: t1.int_col = t2.int_col +| +|--01:SCAN HDFS [functional.alltypessmall t2] +| partitions=4/4 files=4 size=6.32KB +| predicates: t2.id < 10 +| +00:SCAN HDFS [functional.alltypes t1] + partitions=24/24 files=24 size=478.45KB +====
