IMPALA-5531: Fix correctness issue in correlated aggregate subqueries

This commit fixes an issue where a query will return wrong results if it
has an aggregate subquery with a correlated inequality predicate.
Since the rewrite for this case is not currently supported, an
exception is now thrown during the analysis.

Change-Id: I6ca7b60ef0543430d2f5a802285254ebb52db2ab
Reviewed-on: http://gerrit.cloudera.org:8080/7706
Reviewed-by: Dimitris Tsirogiannis <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d03e7d6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d03e7d6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d03e7d6c

Branch: refs/heads/master
Commit: d03e7d6ce59343924b9659089dc709225630e0fc
Parents: b2ebf3d
Author: Dimitris Tsirogiannis <[email protected]>
Authored: Tue Aug 15 15:27:03 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Aug 24 10:11:09 2017 +0000

----------------------------------------------------------------------
 .../java/org/apache/impala/analysis/Expr.java   | 26 +++++++
 .../apache/impala/analysis/StmtRewriter.java    | 79 +++++++++++++++-----
 .../impala/analysis/AnalyzeSubqueriesTest.java  | 34 +++++++--
 3 files changed, 115 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d03e7d6c/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java 
b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index fdb166d..774d7b3 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -45,6 +45,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -171,6 +172,16 @@ abstract public class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
         public boolean apply(Expr arg) { return 
BinaryPredicate.getEqSlots(arg) != null; }
       };
 
+  public final static com.google.common.base.Predicate<Expr> 
IS_NOT_EQ_BINARY_PREDICATE =
+      new com.google.common.base.Predicate<Expr>() {
+        @Override
+        public boolean apply(Expr arg) {
+          return arg instanceof BinaryPredicate
+              && ((BinaryPredicate) arg).getOp() != Operator.EQ
+              && ((BinaryPredicate) arg).getOp() != Operator.NOT_DISTINCT;
+        }
+      };
+
   public final static com.google.common.base.Predicate<Expr> 
IS_BINARY_PREDICATE =
       new com.google.common.base.Predicate<Expr>() {
         @Override
@@ -1398,4 +1409,19 @@ abstract public class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
       return DEFAULT_AVG_STRING_LENGTH;
     }
   }
+
+  /**
+   * Generates a comma-separated string from the toSql() string 
representations of
+   * 'exprs'.
+   */
+  public static String listToSql(List<Expr> exprs) {
+    com.google.common.base.Function<Expr, String> toSql =
+        new com.google.common.base.Function<Expr, String>() {
+        @Override
+        public String apply(Expr arg) {
+          return arg.toSql();
+        }
+    };
+    return Joiner.on(",").join(Iterables.transform(exprs, toSql));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d03e7d6c/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java 
b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
index a53ffce..740b794 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java
@@ -18,6 +18,7 @@
 package org.apache.impala.analysis;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.impala.analysis.AnalysisContext.AnalysisResult;
@@ -336,9 +337,8 @@ public class StmtRewriter {
     // Extract all correlated predicates from the subquery.
     List<Expr> onClauseConjuncts = extractCorrelatedPredicates(subqueryStmt);
     if (!onClauseConjuncts.isEmpty()) {
-      canRewriteCorrelatedSubquery(expr, onClauseConjuncts);
-      // For correlated subqueries that are eligible for rewrite by 
transforming
-      // into a join, a LIMIT clause has no effect on the results, so we can
+      validateCorrelatedSubqueryStmt(expr);
+      // For correlated subqueries, a LIMIT clause has no effect on the 
results, so we can
       // safely remove it.
       subqueryStmt.limitElement_ = new LimitElement(null, null);
     }
@@ -395,6 +395,12 @@ public class StmtRewriter {
       if (joinConjunct != null) onClauseConjuncts.add(joinConjunct);
     }
 
+    // Ensure that all the extracted correlated predicates can be added to the 
ON-clause
+    // of the generated join.
+    if (!onClauseConjuncts.isEmpty()) {
+      validateCorrelatedPredicates(expr, inlineView, onClauseConjuncts);
+    }
+
     // Create the ON clause from the extracted correlated predicates.
     Expr onClausePredicate =
         CompoundPredicate.createConjunctivePredicate(onClauseConjuncts);
@@ -634,15 +640,12 @@ public class StmtRewriter {
 
   /**
    * Checks if an expr containing a correlated subquery is eligible for 
rewrite by
-   * tranforming into a join. 'correlatedPredicates' contains the correlated
-   * predicates identified in the subquery. Throws an AnalysisException if 
'expr'
-   * is not eligible for rewrite.
+   * tranforming into a join. Throws an AnalysisException if 'expr' is not 
eligible for
+   * rewrite.
    * TODO: Merge all the rewrite eligibility tests into a single function.
    */
-  private static void canRewriteCorrelatedSubquery(Expr expr,
-      List<Expr> correlatedPredicates) throws AnalysisException {
+  private static void validateCorrelatedSubqueryStmt(Expr expr) throws 
AnalysisException {
     Preconditions.checkNotNull(expr);
-    Preconditions.checkNotNull(correlatedPredicates);
     Preconditions.checkState(expr.contains(Subquery.class));
     SelectStmt stmt = (SelectStmt) expr.getSubquery().getStatement();
     Preconditions.checkNotNull(stmt);
@@ -655,6 +658,30 @@ public class StmtRewriter {
           "and/or aggregation: " + stmt.toSql());
     }
 
+    // The following correlated subqueries with a limit clause are supported:
+    // 1. EXISTS subqueries
+    // 2. Scalar subqueries with aggregation
+    if (stmt.hasLimit() &&
+        (!(expr instanceof BinaryPredicate) || !stmt.hasAggInfo() ||
+         stmt.selectList_.isDistinct()) &&
+        !(expr instanceof ExistsPredicate)) {
+      throw new AnalysisException("Unsupported correlated subquery with a " +
+          "LIMIT clause: " + stmt.toSql());
+    }
+  }
+
+  /**
+   * Checks if all the 'correlatedPredicates' extracted from the subquery of 
'expr' can be
+   * added to the ON-clause of the join that results from the subquery 
rewrite. It throws
+   * an AnalysisException is this is not the case. 'inlineView' is the 
generated inline
+   * view that will replace the subquery in the rewritten statement.
+   */
+  private static void validateCorrelatedPredicates(Expr expr, InlineViewRef 
inlineView,
+      List<Expr> correlatedPredicates) throws AnalysisException {
+    Preconditions.checkNotNull(expr);
+    Preconditions.checkNotNull(correlatedPredicates);
+    Preconditions.checkState(inlineView.isAnalyzed());
+    SelectStmt stmt = (SelectStmt) expr.getSubquery().getStatement();
     final com.google.common.base.Predicate<Expr> isSingleSlotRef =
         new com.google.common.base.Predicate<Expr>() {
       @Override
@@ -673,15 +700,31 @@ public class StmtRewriter {
           "HAVING clause: " + stmt.toSql());
     }
 
-    // The following correlated subqueries with a limit clause are supported:
-    // 1. EXISTS subqueries
-    // 2. Scalar subqueries with aggregation
-    if (stmt.hasLimit() &&
-        (!(expr instanceof BinaryPredicate) || !stmt.hasAggInfo() ||
-         stmt.selectList_.isDistinct()) &&
-        !(expr instanceof ExistsPredicate)) {
-      throw new AnalysisException("Unsupported correlated subquery with a " +
-          "LIMIT clause: " + stmt.toSql());
+    // We only support equality correlated predicates in aggregate subqueries
+    // (see IMPALA-5531). This check needs to be performed after the inline 
view
+    // has been analyzed to make sure we don't incorrectly reject non-equality 
correlated
+    // predicates from nested collections.
+    if (expr instanceof BinaryPredicate && !inlineView.isCorrelated()
+        && !correlatedPredicates.isEmpty()) {
+      final List<TupleId> subqueryTblIds = stmt.getTableRefIds();
+      final com.google.common.base.Predicate<Expr> isBoundBySubqueryTids =
+          new com.google.common.base.Predicate<Expr>() {
+            @Override
+            public boolean apply(Expr arg) {
+              List<TupleId> tids = Lists.newArrayList();
+              arg.getIds(tids, null);
+              return !Collections.disjoint(tids, subqueryTblIds);
+            }
+        };
+
+      List<Expr> unsupportedPredicates = Lists.newArrayList(Iterables.filter(
+          correlatedPredicates, Predicates.and(Expr.IS_NOT_EQ_BINARY_PREDICATE,
+              isBoundBySubqueryTids)));
+      if (!unsupportedPredicates.isEmpty()) {
+        throw new AnalysisException("Unsupported aggregate subquery with " +
+            "non-equality correlated predicates: " +
+            Expr.listToSql(unsupportedPredicates));
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d03e7d6c/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
----------------------------------------------------------------------
diff --git 
a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
index 329be5c..dd710cd 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeSubqueriesTest.java
@@ -732,12 +732,26 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
         AnalyzesOk(String.format("select count(*) from functional.alltypes a 
where " +
             "id %s (select %s from functional.alltypestiny t where t.bool_col 
= false " +
             "and a.int_col = t.int_col) and a.bigint_col < 10", cmpOp, aggFn));
+        AnalyzesOk(String.format("select count(*) from functional.alltypes a 
where " +
+            "id %s (select %s from functional.alltypestiny t where " +
+            "t.int_col = a.int_col and a.id < 10)", cmpOp, aggFn));
         // TODO: The rewrite of this query is correct, but could be improved 
by using a
         // semi join instead of an outer join.
-        AnalyzesOk(String.format(
-            "select id from functional.allcomplextypes t where id %s " +
-            "(select %s from (select f1 as id, f2 from t.struct_array_col) v " 
+
+        AnalyzesOk(String.format("select id from functional.allcomplextypes t 
where id " +
+            " %s (select %s from (select f1 as id, f2 from t.struct_array_col) 
v " +
             "where t.int_struct_col.f1 < v.id)", cmpOp, aggFn));
+        // Correlated with inequality predicate
+        AnalysisError(String.format("select id from functional.alltypes t1 
where " +
+            "id %s (select %s from functional.alltypestiny t2 where " +
+            "t1.int_col = t2.int_col and t1.tinyint_col < t2.tinyint_col)", 
cmpOp, aggFn),
+            String.format("Unsupported aggregate subquery with non-equality " +
+            "correlated predicates: t1.tinyint_col < t2.tinyint_col", aggFn));
+        AnalysisError(String.format("select id from functional.alltypes t1 
where " +
+            "id %s (select %s from functional.alltypestiny t2 where " +
+            "t1.int_col = t2.int_col and t1.tinyint_col + 1 < t2.tinyint_col - 
1)", cmpOp,
+            aggFn), String.format("Unsupported aggregate subquery with 
non-equality " +
+            "correlated predicates: t1.tinyint_col + 1 < t2.tinyint_col - 1",
+            aggFn));
         // Correlated with constant expr
         AnalyzesOk(String.format("select count(*) from functional.alltypes a 
where " +
             "10 %s (select %s from functional.alltypestiny t where t.bool_col 
= false " +
@@ -828,9 +842,17 @@ public class AnalyzeSubqueriesTest extends AnalyzerTest {
           String.format("operands of type INT and TIMESTAMP are not 
comparable: " +
           "int_col %s (SELECT max(timestamp_col) FROM 
functional.alltypessmall)", cmpOp));
       // Distinct in the outer select block
-      AnalyzesOk(String.format("select distinct id from functional.alltypes a 
" +
-          "where 100 %s (select count(*) from functional.alltypesagg g where " 
+
-          "a.int_col %s g.int_col) and a.bool_col = false", cmpOp, cmpOp));
+      if (cmpOp == "=") {
+        AnalyzesOk(String.format("select distinct id from functional.alltypes 
a " +
+            "where 100 %s (select count(*) from functional.alltypesagg g where 
" +
+            "a.int_col %s g.int_col) and a.bool_col = false", cmpOp, cmpOp));
+      } else {
+        AnalysisError(String.format("select distinct id from 
functional.alltypes a " +
+            "where 100 %s (select count(*) from functional.alltypesagg g where 
" +
+            "a.int_col %s g.int_col) and a.bool_col = false", cmpOp, cmpOp),
+            String.format("Unsupported aggregate subquery with non-equality " +
+            "correlated predicates: a.int_col %s g.int_col", cmpOp));
+      }
     }
 
     // Subquery returns multiple rows

Reply via email to