Repository: spark
Updated Branches:
  refs/heads/master f8878a4c6 -> 360063521


[SPARK-18614][SQL] Incorrect predicate pushdown from ExistenceJoin

## What changes were proposed in this pull request?

ExistenceJoin should be treated the same as LeftOuter and LeftAnti, not 
InnerLike and LeftSemi. This is not currently exposed because the rewrite of 
[NOT] EXISTS OR ... to ExistenceJoin happens in rule RewritePredicateSubquery, 
which is in a separate rule set and placed after the rule 
PushPredicateThroughJoin. During the transformation in the rule 
PushPredicateThroughJoin, an ExistenceJoin never exists.

The semantics of ExistenceJoin says we need to preserve all the rows from the 
left table through the join operation as if it is a regular LeftOuter join. The 
ExistenceJoin augments the LeftOuter operation with a new column called exists, 
set to true when the join condition in the ON clause is true and false 
otherwise. The filter of any rows will happen in the Filter operation above the 
ExistenceJoin.

Example:

A(c1, c2): { (1, 1), (1, 2) }
// B can be any value as it is irrelevant in this example
B(c1): { (NULL) }

select A.*
from   A
where  exists (select 1 from B where A.c1 = A.c2)
       or A.c2=2

In this example, the correct result is all the rows from A. If the pattern 
ExistenceJoin around line 935 in Optimizer.scala is indeed active, the code 
will push down the predicate A.c1 = A.c2 to be a Filter on relation A, which 
will incorrectly filter the row (1,2) from A.

## How was this patch tested?

Since this is not an exposed case, no new test cases is added. The scenario is 
discovered via a code review of another PR and confirmed to be valid with peer.

Author: Nattavut Sutyanyong <[email protected]>

Closes #16044 from nsyca/spark-18614.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36006352
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36006352
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36006352

Branch: refs/heads/master
Commit: 3600635215f25d695c9be5931b5185fec8a35527
Parents: f8878a4
Author: Nattavut Sutyanyong <[email protected]>
Authored: Tue Nov 29 15:27:43 2016 -0800
Committer: Herman van Hovell <[email protected]>
Committed: Tue Nov 29 15:27:43 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  4 +-
 .../optimizer/FilterPushdownSuite.scala         | 17 +++++++++
 .../resources/sql-tests/inputs/anti-join.sql    |  7 ----
 .../sql-tests/inputs/pred-pushdown.sql          | 12 ++++++
 .../sql-tests/results/anti-join.sql.out         | 29 --------------
 .../sql-tests/results/pred-pushdown.sql.out     | 40 ++++++++++++++++++++
 6 files changed, 71 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/36006352/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 805cad5..37f0c8e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
         split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), 
left, right)
 
       joinType match {
-        case _: InnerLike |  LeftSemi | ExistenceJoin(_) =>
+        case _: InnerLike |  LeftSemi =>
           // push down the single side only join filter for both sides sub 
queries
           val newLeft = leftJoinConditions.
             reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
@@ -949,7 +949,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
           val newJoinCond = (rightJoinConditions ++ 
commonJoinCondition).reduceLeftOption(And)
 
           Join(newLeft, newRight, RightOuter, newJoinCond)
-        case LeftOuter | LeftAnti =>
+        case LeftOuter | LeftAnti | ExistenceJoin(_) =>
           // push down the right side only join filter for right sub query
           val newLeft = left
           val newRight = rightJoinConditions.

http://git-wip-us.apache.org/repos/asf/spark/blob/36006352/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 3e67282..6feea40 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -546,6 +546,23 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
   }
 
+  test("joins: only push down join conditions to the right of an existence 
join") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val fillerVal = 'val.boolean
+    val originalQuery =
+      x.join(y,
+        ExistenceJoin(fillerVal),
+        Some("x.a".attr > 1 && "y.b".attr > 2)).analyze
+    val optimized = Optimize.execute(originalQuery)
+    val correctAnswer =
+      x.join(
+        y.where("y.b".attr > 2),
+        ExistenceJoin(fillerVal),
+        Some("x.a".attr > 1))
+      .analyze
+    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
+  }
 
   val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 
'c_arr.array(IntegerType))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/36006352/sql/core/src/test/resources/sql-tests/inputs/anti-join.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/anti-join.sql 
b/sql/core/src/test/resources/sql-tests/inputs/anti-join.sql
deleted file mode 100644
index 0346f57..0000000
--- a/sql/core/src/test/resources/sql-tests/inputs/anti-join.sql
+++ /dev/null
@@ -1,7 +0,0 @@
--- SPARK-18597: Do not push down predicates to left hand side in an anti-join
-CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS 
T(c1, c2);
-CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1);
-
-SELECT *
-FROM   tbl_a
-       LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = 
tbl_a.c2);

http://git-wip-us.apache.org/repos/asf/spark/blob/36006352/sql/core/src/test/resources/sql-tests/inputs/pred-pushdown.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/pred-pushdown.sql 
b/sql/core/src/test/resources/sql-tests/inputs/pred-pushdown.sql
new file mode 100644
index 0000000..eff258a
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/pred-pushdown.sql
@@ -0,0 +1,12 @@
+CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS 
T(c1, c2);
+CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1);
+
+-- SPARK-18597: Do not push down predicates to left hand side in an anti-join
+SELECT *
+FROM   tbl_a
+       LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = 
tbl_a.c2);
+
+-- SPARK-18614: Do not push down predicates on left table below ExistenceJoin
+SELECT l.c1, l.c2
+FROM   tbl_a l
+WHERE  EXISTS (SELECT 1 FROM tbl_b r WHERE l.c1 = l.c2) OR l.c2 < 2;

http://git-wip-us.apache.org/repos/asf/spark/blob/36006352/sql/core/src/test/resources/sql-tests/results/anti-join.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/anti-join.sql.out 
b/sql/core/src/test/resources/sql-tests/results/anti-join.sql.out
deleted file mode 100644
index 6f38c4d..0000000
--- a/sql/core/src/test/resources/sql-tests/results/anti-join.sql.out
+++ /dev/null
@@ -1,29 +0,0 @@
--- Automatically generated by SQLQueryTestSuite
--- Number of queries: 3
-
-
--- !query 0
-CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS 
T(c1, c2)
--- !query 0 schema
-struct<>
--- !query 0 output
-
-
-
--- !query 1
-CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1)
--- !query 1 schema
-struct<>
--- !query 1 output
-
-
-
--- !query 2
-SELECT *
-FROM   tbl_a
-       LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = 
tbl_a.c2)
--- !query 2 schema
-struct<c1:int,c2:int>
--- !query 2 output
-2      1
-3      6

http://git-wip-us.apache.org/repos/asf/spark/blob/36006352/sql/core/src/test/resources/sql-tests/results/pred-pushdown.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/pred-pushdown.sql.out 
b/sql/core/src/test/resources/sql-tests/results/pred-pushdown.sql.out
new file mode 100644
index 0000000..1b8ddbe
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/pred-pushdown.sql.out
@@ -0,0 +1,40 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 4
+
+
+-- !query 0
+CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS 
T(c1, c2)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+SELECT *
+FROM   tbl_a
+       LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = 
tbl_a.c2)
+-- !query 2 schema
+struct<c1:int,c2:int>
+-- !query 2 output
+2      1
+3      6
+
+
+-- !query 3
+SELECT l.c1, l.c2
+FROM   tbl_a l
+WHERE  EXISTS (SELECT 1 FROM tbl_b r WHERE l.c1 = l.c2) OR l.c2 < 2
+-- !query 3 schema
+struct<c1:int,c2:int>
+-- !query 3 output
+1      1
+2      1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to