This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 738db079c0b6 [SPARK-49646][SQL] fix subquery decorrelation for
union/set operations when parentOuterReferences has references not covered in
collectedChildOuterReferences
738db079c0b6 is described below
commit 738db079c0b65e8305b7a1349923ee017316f691
Author: Avery Qi <[email protected]>
AuthorDate: Mon Sep 16 11:37:37 2024 +0800
[SPARK-49646][SQL] fix subquery decorrelation for union/set operations when
parentOuterReferences has references not covered in
collectedChildOuterReferences
### What changes were proposed in this pull request?
fix bug when encounter union/setOp under limit/aggregation with filter
predicates cannot pulled up directly in lateral join. eg:
```
create table IF NOT EXISTS t(t1 INT,t2 int) using json;
CREATE TABLE IF NOT EXISTS a (a1 INT) using json;
select 1
from t as t_outer
left join
lateral(
select b1,b2
from
(
select
a.a1 as b1,
1 as b2
from a
union
select t_outer.t1 as b1,
null as b2
) as t_inner
where (t_inner.b1 < t_outer.t2 or t_inner.b1 is null) and
t_inner.b1 = t_outer.t1
order by t_inner.b1,t_inner.b2 desc limit 1
) as lateral_table
```
### Why are the changes needed?
In general, spark cannot handle this query because:
1. Decorrelation logic tries to rewrite limit operator into Window
aggregation and pull up correlated predicates, and Union operator is rewritten
to have DomainJoin within its children with outer references.
2. When we're rewriting DomainJoin to real join execution, it needs
attribute reference map based on pulled up correlated predicates to rewrite
outer references in DomainJoin. However, each child of Union/SetOp operator are
using different attribute references even they are referring to the same column
of outer table. We need Union/SetOp output and its children output to map
between these references.
3. Combined with aggregation and filters with inequality comparison, more
outer references are remained within children of Union operator, and these
references are not covered in Union/SetOp output which leads to lacking of
information when we're trying to map different attributed references within
children of Union/SetOp operator.
More context -> please read this short investigation doc(I've changed the
link and it's now public):
https://docs.google.com/document/d/1_pJIi_8GuLHOXabLEgRy2e7OHw-OIBnWbwGwSkwIcxg/edit?usp=sharing
### Does this PR introduce _any_ user-facing change?
yes, bug is fixed and the above query can be handled without error.
### How was this patch tested?
added unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48109 from averyqi-db/averyqi-db/SPARK-49646.
Authored-by: Avery Qi <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalyst/optimizer/DecorrelateInnerQuery.scala | 2 +-
.../analyzer-results/join-lateral.sql.out | 47 ++++++++++++++++++++++
.../resources/sql-tests/inputs/join-lateral.sql | 21 ++++++++++
.../sql-tests/results/join-lateral.sql.out | 27 +++++++++++++
4 files changed, 96 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index 424f4b96271d..6c0d7189862d 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -1064,7 +1064,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
// Project, they could get added at the beginning or the end
of the output columns
// depending on the child plan.
// The inner expressions for the domain are the values of
newOuterReferenceMap.
- val domainProjections =
collectedChildOuterReferences.map(newOuterReferenceMap(_))
+ val domainProjections =
newOuterReferences.map(newOuterReferenceMap(_))
val newChild = Project(child.output ++ domainProjections,
decorrelatedChild)
(newChild, newJoinCond, newOuterReferenceMap)
}
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
index e81ee769f57d..5bf893605423 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
@@ -3017,6 +3017,53 @@ Project [c1#x, c2#x, t#x]
+- LocalRelation [col1#x, col2#x]
+-- !query
+select 1
+from t1 as t_outer
+left join
+ lateral(
+ select b1,b2
+ from
+ (
+ select
+ t2.c1 as b1,
+ 1 as b2
+ from t2
+ union
+ select t_outer.c1 as b1,
+ null as b2
+ ) as t_inner
+ where (t_inner.b1 < t_outer.c2 or t_inner.b1 is null)
+ and t_inner.b1 = t_outer.c1
+ order by t_inner.b1,t_inner.b2 desc limit 1
+ ) as lateral_table
+-- !query analysis
+Project [1 AS 1#x]
++- LateralJoin lateral-subquery#x [c2#x && c1#x && c1#x], LeftOuter
+ : +- SubqueryAlias lateral_table
+ : +- GlobalLimit 1
+ : +- LocalLimit 1
+ : +- Sort [b1#x ASC NULLS FIRST, b2#x DESC NULLS LAST], true
+ : +- Project [b1#x, b2#x]
+ : +- Filter (((b1#x < outer(c2#x)) OR isnull(b1#x)) AND
(b1#x = outer(c1#x)))
+ : +- SubqueryAlias t_inner
+ : +- Distinct
+ : +- Union false, false
+ : :- Project [c1#x AS b1#x, 1 AS b2#x]
+ : : +- SubqueryAlias spark_catalog.default.t2
+ : : +- View
(`spark_catalog`.`default`.`t2`, [c1#x, c2#x])
+ : : +- Project [cast(col1#x as int) AS
c1#x, cast(col2#x as int) AS c2#x]
+ : : +- LocalRelation [col1#x, col2#x]
+ : +- Project [b1#x, cast(b2#x as int) AS b2#x]
+ : +- Project [outer(c1#x) AS b1#x, null AS
b2#x]
+ : +- OneRowRelation
+ +- SubqueryAlias t_outer
+ +- SubqueryAlias spark_catalog.default.t1
+ +- View (`spark_catalog`.`default`.`t1`, [c1#x, c2#x])
+ +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS
c2#x]
+ +- LocalRelation [col1#x, col2#x]
+
+
-- !query
DROP VIEW t1
-- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
index 8bff1f109aa6..e3cef9207d20 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
@@ -531,6 +531,27 @@ select * from t1 join lateral
(select t4.c1 as t from t4 where t1.c1 = t4.c1)) as foo
order by foo.t limit 5);
+
+select 1
+from t1 as t_outer
+left join
+ lateral(
+ select b1,b2
+ from
+ (
+ select
+ t2.c1 as b1,
+ 1 as b2
+ from t2
+ union
+ select t_outer.c1 as b1,
+ null as b2
+ ) as t_inner
+ where (t_inner.b1 < t_outer.c2 or t_inner.b1 is null)
+ and t_inner.b1 = t_outer.c1
+ order by t_inner.b1,t_inner.b2 desc limit 1
+ ) as lateral_table;
+
-- clean up
DROP VIEW t1;
DROP VIEW t2;
diff --git a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
index ced8d6398a66..11bafb2cf63c 100644
--- a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
@@ -1878,6 +1878,33 @@ struct<c1:int,c2:int,t:int>
1 2 3
+-- !query
+select 1
+from t1 as t_outer
+left join
+ lateral(
+ select b1,b2
+ from
+ (
+ select
+ t2.c1 as b1,
+ 1 as b2
+ from t2
+ union
+ select t_outer.c1 as b1,
+ null as b2
+ ) as t_inner
+ where (t_inner.b1 < t_outer.c2 or t_inner.b1 is null)
+ and t_inner.b1 = t_outer.c1
+ order by t_inner.b1,t_inner.b2 desc limit 1
+ ) as lateral_table
+-- !query schema
+struct<1:int>
+-- !query output
+1
+1
+
+
-- !query
DROP VIEW t1
-- !query schema
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]