IMPALA-5531: Fix correctness issue in correlated aggregate subqueries This commit fixes an issue where a query will return wrong results if it has an aggregate subquery with a correlated inequality predicate. Since the rewrite for this case is not currently supported, an exception is now thrown during the analysis.
Change-Id: I6ca7b60ef0543430d2f5a802285254ebb52db2ab Reviewed-on: http://gerrit.cloudera.org:8080/7706 Reviewed-by: Dimitris Tsirogiannis <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d03e7d6c Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d03e7d6c Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d03e7d6c Branch: refs/heads/master Commit: d03e7d6ce59343924b9659089dc709225630e0fc Parents: b2ebf3d Author: Dimitris Tsirogiannis <[email protected]> Authored: Tue Aug 15 15:27:03 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Thu Aug 24 10:11:09 2017 +0000 ---------------------------------------------------------------------- .../java/org/apache/impala/analysis/Expr.java | 26 +++++++ .../apache/impala/analysis/StmtRewriter.java | 79 +++++++++++++++----- .../impala/analysis/AnalyzeSubqueriesTest.java | 34 +++++++-- 3 files changed, 115 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d03e7d6c/fe/src/main/java/org/apache/impala/analysis/Expr.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java index fdb166d..774d7b3 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Expr.java +++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java @@ -45,6 +45,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -171,6 +172,16 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl public boolean apply(Expr arg) { return BinaryPredicate.getEqSlots(arg) != null; } }; + public final static com.google.common.base.Predicate<Expr> IS_NOT_EQ_BINARY_PREDICATE = + new com.google.common.base.Predicate<Expr>() { + @Override + public boolean apply(Expr arg) { + return arg instanceof BinaryPredicate + && ((BinaryPredicate) arg).getOp() != Operator.EQ + && ((BinaryPredicate) arg).getOp() != Operator.NOT_DISTINCT; + } + }; + public final static com.google.common.base.Predicate<Expr> IS_BINARY_PREDICATE = new com.google.common.base.Predicate<Expr>() { @Override @@ -1398,4 +1409,19 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl return DEFAULT_AVG_STRING_LENGTH; } } + + /** + * Generates a comma-separated string from the toSql() string representations of + * 'exprs'. + */ + public static String listToSql(List<Expr> exprs) { + com.google.common.base.Function<Expr, String> toSql = + new com.google.common.base.Function<Expr, String>() { + @Override + public String apply(Expr arg) { + return arg.toSql(); + } + }; + return Joiner.on(",").join(Iterables.transform(exprs, toSql)); + } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d03e7d6c/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java index a53ffce..740b794 100644 --- a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java +++ b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java @@ -18,6 +18,7 @@ package org.apache.impala.analysis; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.impala.analysis.AnalysisContext.AnalysisResult; @@ -336,9 +337,8 @@ public class StmtRewriter { // Extract all correlated predicates from the subquery. List<Expr> onClauseConjuncts = extractCorrelatedPredicates(subqueryStmt); if (!onClauseConjuncts.isEmpty()) { - canRewriteCorrelatedSubquery(expr, onClauseConjuncts); - // For correlated subqueries that are eligible for rewrite by transforming - // into a join, a LIMIT clause has no effect on the results, so we can + validateCorrelatedSubqueryStmt(expr); + // For correlated subqueries, a LIMIT clause has no effect on the results, so we can // safely remove it. subqueryStmt.limitElement_ = new LimitElement(null, null); } @@ -395,6 +395,12 @@ public class StmtRewriter { if (joinConjunct != null) onClauseConjuncts.add(joinConjunct); } + // Ensure that all the extracted correlated predicates can be added to the ON-clause + // of the generated join. + if (!onClauseConjuncts.isEmpty()) { + validateCorrelatedPredicates(expr, inlineView, onClauseConjuncts); + } + // Create the ON clause from the extracted correlated predicates. Expr onClausePredicate = CompoundPredicate.createConjunctivePredicate(onClauseConjuncts); @@ -634,15 +640,12 @@ public class StmtRewriter { /** * Checks if an expr containing a correlated subquery is eligible for rewrite by - * tranforming into a join. 'correlatedPredicates' contains the correlated - * predicates identified in the subquery. Throws an AnalysisException if 'expr' - * is not eligible for rewrite. + * tranforming into a join. Throws an AnalysisException if 'expr' is not eligible for + * rewrite. * TODO: Merge all the rewrite eligibility tests into a single function. */ - private static void canRewriteCorrelatedSubquery(Expr expr, - List<Expr> correlatedPredicates) throws AnalysisException { + private static void validateCorrelatedSubqueryStmt(Expr expr) throws AnalysisException { Preconditions.checkNotNull(expr); - Preconditions.checkNotNull(correlatedPredicates); Preconditions.checkState(expr.contains(Subquery.class)); SelectStmt stmt = (SelectStmt) expr.getSubquery().getStatement(); Preconditions.checkNotNull(stmt); @@ -655,6 +658,30 @@ public class StmtRewriter { "and/or aggregation: " + stmt.toSql()); } + // The following correlated subqueries with a limit clause are supported: + // 1. EXISTS subqueries + // 2. Scalar subqueries with aggregation + if (stmt.hasLimit() && + (!(expr instanceof BinaryPredicate) || !stmt.hasAggInfo() || + stmt.selectList_.isDistinct()) && + !(expr instanceof ExistsPredicate)) { + throw new AnalysisException("Unsupported correlated subquery with a " + + "LIMIT clause: " + stmt.toSql()); + } + } + + /** + * Checks if all the 'correlatedPredicates' extracted from the subquery of 'expr' can be + * added to the ON-clause of the join that results from the subquery rewrite. It throws + * an AnalysisException is this is not the case. 'inlineView' is the generated inline + * view that will replace the subquery in the rewritten statement. + */ + private static void validateCorrelatedPredicates(Expr expr, InlineViewRef inlineView, + List<Expr> correlatedPredicates) throws AnalysisException { + Preconditions.checkNotNull(expr); + Preconditions.checkNotNull(correlatedPredicates); + Preconditions.checkState(inlineView.isAnalyzed()); + SelectStmt stmt = (SelectStmt) expr.getSubquery().getStatement(); final com.google.common.base.Predicate<Expr> isSingleSlotRef = new com.google.common.base.Predicate<Expr>() { @Override @@ -673,15 +700,31 @@ public class StmtRewriter { "HAVING clause: " + stmt.toSql()); } - // The following correlated subqueries with a limit clause are supported: - // 1. EXISTS subqueries - // 2. Scalar subqueries with aggregation - if (stmt.hasLimit() && - (!(expr instanceof BinaryPredicate) || !stmt.hasAggInfo() || - stmt.selectList_.isDistinct()) && - !(expr instanceof ExistsPredicate)) { - throw new AnalysisException("Unsupported correlated subquery with a " + - "LIMIT clause: " + stmt.toSql()); + // We only support equality correlated predicates in aggregate subqueries + // (see IMPALA-5531). This check needs to be performed after the inline view + // has been analyzed to make sure we don't incorrectly reject non-equality correlated + // predicates from nested collections. + if (expr instanceof BinaryPredicate && !inlineView.isCorrelated() + && !correlatedPredicates.isEmpty()) { + final List<TupleId> subqueryTblIds = stmt.getTableRefIds(); + final com.google.common.base.Predicate<Expr> isBoundBySubqueryTids = + new com.google.common.base.Predicate<Expr>() { + @Override + public boolean apply(Expr arg) { + List<TupleId> tids = Lists.newArrayList(); + arg.getIds(tids, null); + return !Collections.disjoint(tids, subqueryTblIds); + } + }; + + List<Expr> unsupportedPredicates = Lists.newArrayList(Iterables.filter( + correlatedPredicates, Predicates.and(Expr.IS_NOT_EQ_BINARY_PREDICATE, + isBoundBySubqueryTids))); + if (!unsupportedPredicates.isEmpty()) { + throw new AnalysisException("Unsupported aggregate subquery with " + + "non-equality correlated predicates: " + + Expr.listToSql(unsupportedPredicates)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d03e7d6c/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java index 329be5c..dd710cd 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java @@ -732,12 +732,26 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest { AnalyzesOk(String.format("select count(*) from functional.alltypes a where " + "id %s (select %s from functional.alltypestiny t where t.bool_col = false " + "and a.int_col = t.int_col) and a.bigint_col < 10", cmpOp, aggFn)); + AnalyzesOk(String.format("select count(*) from functional.alltypes a where " + + "id %s (select %s from functional.alltypestiny t where " + + "t.int_col = a.int_col and a.id < 10)", cmpOp, aggFn)); // TODO: The rewrite of this query is correct, but could be improved by using a // semi join instead of an outer join. - AnalyzesOk(String.format( - "select id from functional.allcomplextypes t where id %s " + - "(select %s from (select f1 as id, f2 from t.struct_array_col) v " + + AnalyzesOk(String.format("select id from functional.allcomplextypes t where id " + + " %s (select %s from (select f1 as id, f2 from t.struct_array_col) v " + "where t.int_struct_col.f1 < v.id)", cmpOp, aggFn)); + // Correlated with inequality predicate + AnalysisError(String.format("select id from functional.alltypes t1 where " + + "id %s (select %s from functional.alltypestiny t2 where " + + "t1.int_col = t2.int_col and t1.tinyint_col < t2.tinyint_col)", cmpOp, aggFn), + String.format("Unsupported aggregate subquery with non-equality " + + "correlated predicates: t1.tinyint_col < t2.tinyint_col", aggFn)); + AnalysisError(String.format("select id from functional.alltypes t1 where " + + "id %s (select %s from functional.alltypestiny t2 where " + + "t1.int_col = t2.int_col and t1.tinyint_col + 1 < t2.tinyint_col - 1)", cmpOp, + aggFn), String.format("Unsupported aggregate subquery with non-equality " + + "correlated predicates: t1.tinyint_col + 1 < t2.tinyint_col - 1", + aggFn)); // Correlated with constant expr AnalyzesOk(String.format("select count(*) from functional.alltypes a where " + "10 %s (select %s from functional.alltypestiny t where t.bool_col = false " + @@ -828,9 +842,17 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest { String.format("operands of type INT and TIMESTAMP are not comparable: " + "int_col %s (SELECT max(timestamp_col) FROM functional.alltypessmall)", cmpOp)); // Distinct in the outer select block - AnalyzesOk(String.format("select distinct id from functional.alltypes a " + - "where 100 %s (select count(*) from functional.alltypesagg g where " + - "a.int_col %s g.int_col) and a.bool_col = false", cmpOp, cmpOp)); + if (cmpOp == "=") { + AnalyzesOk(String.format("select distinct id from functional.alltypes a " + + "where 100 %s (select count(*) from functional.alltypesagg g where " + + "a.int_col %s g.int_col) and a.bool_col = false", cmpOp, cmpOp)); + } else { + AnalysisError(String.format("select distinct id from functional.alltypes a " + + "where 100 %s (select count(*) from functional.alltypesagg g where " + + "a.int_col %s g.int_col) and a.bool_col = false", cmpOp, cmpOp), + String.format("Unsupported aggregate subquery with non-equality " + + "correlated predicates: a.int_col %s g.int_col", cmpOp)); + } } // Subquery returns multiple rows
