This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 738db079c0b6 [SPARK-49646][SQL] fix subquery decorrelation for 
union/set operations when parentOuterReferences has references not covered in 
collectedChildOuterReferences
738db079c0b6 is described below

commit 738db079c0b65e8305b7a1349923ee017316f691
Author: Avery Qi <[email protected]>
AuthorDate: Mon Sep 16 11:37:37 2024 +0800

    [SPARK-49646][SQL] fix subquery decorrelation for union/set operations when 
parentOuterReferences has references not covered in 
collectedChildOuterReferences
    
    ### What changes were proposed in this pull request?
    fix bug when encounter union/setOp under limit/aggregation with filter 
predicates cannot pulled up directly in lateral join. eg:
    ```
    create table IF NOT EXISTS t(t1 INT,t2 int) using json;
    CREATE TABLE IF NOT EXISTS a (a1 INT) using json;
    
    select 1
    from t as t_outer
    left join
       lateral(
           select b1,b2
           from
           (
               select
                   a.a1 as b1,
                   1 as b2
               from a
               union
               select t_outer.t1 as b1,
                      null as b2
           ) as t_inner
           where (t_inner.b1 < t_outer.t2  or t_inner.b1 is null) and  
t_inner.b1 = t_outer.t1
           order by t_inner.b1,t_inner.b2 desc limit 1
       ) as lateral_table
    ```
    
    ### Why are the changes needed?
    In general, spark cannot handle this query because:
    1. Decorrelation logic tries to rewrite limit operator into Window 
aggregation and pull up correlated predicates, and Union operator is rewritten 
to have DomainJoin within its children with outer references.
    2. When we're rewriting DomainJoin to real join execution, it needs 
attribute reference map based on pulled up correlated predicates to rewrite 
outer references in DomainJoin. However, each child of Union/SetOp operator are 
using different attribute references even they are referring to the same column 
of outer table. We need Union/SetOp output and its children output to map 
between these references.
    3. Combined with aggregation and filters with inequality comparison, more 
outer references are remained within children of Union operator, and these 
references are not covered in Union/SetOp output which leads to lacking of 
information when we're trying to map different attributed references within 
children of Union/SetOp operator.
    
    More context -> please read this short investigation doc(I've changed the 
link and it's now public):
    
https://docs.google.com/document/d/1_pJIi_8GuLHOXabLEgRy2e7OHw-OIBnWbwGwSkwIcxg/edit?usp=sharing
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug is fixed and the above query can be handled without error.
    
    ### How was this patch tested?
    
    added unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48109 from averyqi-db/averyqi-db/SPARK-49646.
    
    Authored-by: Avery Qi <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../catalyst/optimizer/DecorrelateInnerQuery.scala |  2 +-
 .../analyzer-results/join-lateral.sql.out          | 47 ++++++++++++++++++++++
 .../resources/sql-tests/inputs/join-lateral.sql    | 21 ++++++++++
 .../sql-tests/results/join-lateral.sql.out         | 27 +++++++++++++
 4 files changed, 96 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index 424f4b96271d..6c0d7189862d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -1064,7 +1064,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
                 // Project, they could get added at the beginning or the end 
of the output columns
                 // depending on the child plan.
                 // The inner expressions for the domain are the values of 
newOuterReferenceMap.
-                val domainProjections = 
collectedChildOuterReferences.map(newOuterReferenceMap(_))
+                val domainProjections = 
newOuterReferences.map(newOuterReferenceMap(_))
                 val newChild = Project(child.output ++ domainProjections, 
decorrelatedChild)
                 (newChild, newJoinCond, newOuterReferenceMap)
               }
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
index e81ee769f57d..5bf893605423 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
@@ -3017,6 +3017,53 @@ Project [c1#x, c2#x, t#x]
             +- LocalRelation [col1#x, col2#x]
 
 
+-- !query
+select 1
+from t1 as t_outer
+left join
+ lateral(
+     select b1,b2
+     from
+     (
+         select
+             t2.c1 as b1,
+             1 as b2
+         from t2
+         union
+         select t_outer.c1 as b1,
+                null as b2
+     ) as t_inner
+     where (t_inner.b1 < t_outer.c2  or t_inner.b1 is null)
+      and  t_inner.b1 = t_outer.c1
+     order by t_inner.b1,t_inner.b2 desc limit 1
+ ) as lateral_table
+-- !query analysis
+Project [1 AS 1#x]
++- LateralJoin lateral-subquery#x [c2#x && c1#x && c1#x], LeftOuter
+   :  +- SubqueryAlias lateral_table
+   :     +- GlobalLimit 1
+   :        +- LocalLimit 1
+   :           +- Sort [b1#x ASC NULLS FIRST, b2#x DESC NULLS LAST], true
+   :              +- Project [b1#x, b2#x]
+   :                 +- Filter (((b1#x < outer(c2#x)) OR isnull(b1#x)) AND 
(b1#x = outer(c1#x)))
+   :                    +- SubqueryAlias t_inner
+   :                       +- Distinct
+   :                          +- Union false, false
+   :                             :- Project [c1#x AS b1#x, 1 AS b2#x]
+   :                             :  +- SubqueryAlias spark_catalog.default.t2
+   :                             :     +- View 
(`spark_catalog`.`default`.`t2`, [c1#x, c2#x])
+   :                             :        +- Project [cast(col1#x as int) AS 
c1#x, cast(col2#x as int) AS c2#x]
+   :                             :           +- LocalRelation [col1#x, col2#x]
+   :                             +- Project [b1#x, cast(b2#x as int) AS b2#x]
+   :                                +- Project [outer(c1#x) AS b1#x, null AS 
b2#x]
+   :                                   +- OneRowRelation
+   +- SubqueryAlias t_outer
+      +- SubqueryAlias spark_catalog.default.t1
+         +- View (`spark_catalog`.`default`.`t1`, [c1#x, c2#x])
+            +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS 
c2#x]
+               +- LocalRelation [col1#x, col2#x]
+
+
 -- !query
 DROP VIEW t1
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql 
b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
index 8bff1f109aa6..e3cef9207d20 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
@@ -531,6 +531,27 @@ select * from t1 join lateral
    (select t4.c1 as t from t4 where t1.c1 = t4.c1)) as foo
    order by foo.t limit 5);
 
+
+select 1
+from t1 as t_outer
+left join
+ lateral(
+     select b1,b2
+     from
+     (
+         select
+             t2.c1 as b1,
+             1 as b2
+         from t2
+         union
+         select t_outer.c1 as b1,
+                null as b2
+     ) as t_inner
+     where (t_inner.b1 < t_outer.c2  or t_inner.b1 is null)
+      and  t_inner.b1 = t_outer.c1
+     order by t_inner.b1,t_inner.b2 desc limit 1
+ ) as lateral_table;
+
 -- clean up
 DROP VIEW t1;
 DROP VIEW t2;
diff --git a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out 
b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
index ced8d6398a66..11bafb2cf63c 100644
--- a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
@@ -1878,6 +1878,33 @@ struct<c1:int,c2:int,t:int>
 1      2       3
 
 
+-- !query
+select 1
+from t1 as t_outer
+left join
+ lateral(
+     select b1,b2
+     from
+     (
+         select
+             t2.c1 as b1,
+             1 as b2
+         from t2
+         union
+         select t_outer.c1 as b1,
+                null as b2
+     ) as t_inner
+     where (t_inner.b1 < t_outer.c2  or t_inner.b1 is null)
+      and  t_inner.b1 = t_outer.c1
+     order by t_inner.b1,t_inner.b2 desc limit 1
+ ) as lateral_table
+-- !query schema
+struct<1:int>
+-- !query output
+1
+1
+
+
 -- !query
 DROP VIEW t1
 -- !query schema


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to