This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dc73a8d7e96e [SPARK-46526][SQL] Support LIMIT over correlated
subqueries where predicates only reference outer table
dc73a8d7e96e is described below
commit dc73a8d7e96ead55053096971c838908b7c90527
Author: Andrey Gubichev <[email protected]>
AuthorDate: Wed Feb 7 11:54:21 2024 +0800
[SPARK-46526][SQL] Support LIMIT over correlated subqueries where
predicates only reference outer table
### What changes were proposed in this pull request?
The type of query that this PR addresses is the following:
```
SELECT COUNT(DISTINCT(t1a))
FROM t1
WHERE t1d IN (SELECT t2d
FROM t2
WHERE t1a IS NOT NULL
LIMIT 10);
```
Here, the predicate in the subquery `t1a IS NOT NULL` does not reference
the inner table at all, so our standard decorrelation technique of "compute 10
values of t2d per every value of the inner table" does not work. In fact, such
predicates can be lifted above the limit 10. This PR achieves exactly that.
### Why are the changes needed?
Fixed the bug.
### Does this PR introduce _any_ user-facing change?
Some broken queries are now working.
### How was this patch tested?
Query tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #44514 from agubichev/SPARK-46526_limit_corr.
Authored-by: Andrey Gubichev <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalyst/optimizer/DecorrelateInnerQuery.scala | 34 +++++----
.../exists-subquery/exists-orderby-limit.sql.out | 57 ++++++++++++++
.../subquery/in-subquery/in-limit.sql.out | 85 +++++++++++++++++++++
.../scalar-subquery-predicate.sql.out | 89 ++++++++++++++++++++++
.../exists-subquery/exists-orderby-limit.sql | 18 +++++
.../inputs/subquery/in-subquery/in-limit.sql | 21 +++++
.../scalar-subquery/scalar-subquery-predicate.sql | 22 ++++++
.../exists-subquery/exists-orderby-limit.sql.out | 39 ++++++++++
.../results/subquery/in-subquery/in-limit.sql.out | 39 ++++++++++
.../scalar-subquery-predicate.sql.out | 40 ++++++++++
10 files changed, 431 insertions(+), 13 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index eca392fd84ca..1ebf0c7b39a4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -673,20 +673,28 @@ object DecorrelateInnerQuery extends PredicateHelper {
decorrelate(child, parentOuterReferences, aggregated = true,
underSetOp)
val collectedChildOuterReferences =
collectOuterReferencesInPlanTree(child)
// Add outer references to the PARTITION BY clause
- val partitionFields =
collectedChildOuterReferences.map(outerReferenceMap(_)).toSeq
- val orderByFields = replaceOuterReferences(ordering,
outerReferenceMap)
+ val partitionFields = collectedChildOuterReferences
+ .filter(outerReferenceMap.contains(_))
+ .map(outerReferenceMap(_)).toSeq
+ if (partitionFields.isEmpty) {
+ // Underlying subquery has no predicates connecting inner and
outer query.
+ // In this case, limit can be computed over the inner query
directly.
+ (Limit(limit, newChild), joinCond, outerReferenceMap)
+ } else {
+ val orderByFields = replaceOuterReferences(ordering,
outerReferenceMap)
- val rowNumber = WindowExpression(RowNumber(),
- WindowSpecDefinition(partitionFields, orderByFields,
- SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
CurrentRow)))
- val rowNumberAlias = Alias(rowNumber, "rn")()
- // Window function computes row_number() when partitioning by
correlated references,
- // and projects all the other fields from the input.
- val window = Window(Seq(rowNumberAlias),
- partitionFields, orderByFields, newChild)
- val filter = Filter(LessThanOrEqual(rowNumberAlias.toAttribute,
limit), window)
- val project = Project(newChild.output, filter)
- (project, joinCond, outerReferenceMap)
+ val rowNumber = WindowExpression(RowNumber(),
+ WindowSpecDefinition(partitionFields, orderByFields,
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
CurrentRow)))
+ val rowNumberAlias = Alias(rowNumber, "rn")()
+ // Window function computes row_number() when partitioning by
correlated references,
+ // and projects all the other fields from the input.
+ val window = Window(Seq(rowNumberAlias),
+ partitionFields, orderByFields, newChild)
+ val filter = Filter(LessThanOrEqual(rowNumberAlias.toAttribute,
limit), window)
+ val project = Project(newChild.output, filter)
+ (project, joinCond, outerReferenceMap)
+ }
case w @ Window(projectList, partitionSpec, orderSpec, child) =>
val outerReferences = collectOuterReferences(w.expressions)
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-orderby-limit.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-orderby-limit.sql.out
index 0694e8ae52a7..4bfb6c5b843c 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-orderby-limit.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-orderby-limit.sql.out
@@ -615,6 +615,63 @@ Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
+- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x,
dept_id#x]
+-- !query
+SELECT *
+FROM emp
+WHERE EXISTS (SELECT max(dept.dept_id)
+ FROM dept
+ WHERE emp.salary > 200
+ LIMIT 1)
+-- !query analysis
+Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
++- Filter exists#x [salary#x]
+ : +- GlobalLimit 1
+ : +- LocalLimit 1
+ : +- Aggregate [max(dept_id#x) AS max(dept_id)#x]
+ : +- Filter (outer(salary#x) > cast(200 as double))
+ : +- SubqueryAlias dept
+ : +- View (`DEPT`, [dept_id#x, dept_name#x, state#x])
+ : +- Project [cast(dept_id#x as int) AS dept_id#x,
cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) AS state#x]
+ : +- Project [dept_id#x, dept_name#x, state#x]
+ : +- SubqueryAlias DEPT
+ : +- LocalRelation [dept_id#x, dept_name#x,
state#x]
+ +- SubqueryAlias emp
+ +- View (`EMP`, [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x])
+ +- Project [cast(id#x as int) AS id#x, cast(emp_name#x as string) AS
emp_name#x, cast(hiredate#x as date) AS hiredate#x, cast(salary#x as double) AS
salary#x, cast(dept_id#x as int) AS dept_id#x]
+ +- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
+ +- SubqueryAlias EMP
+ +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x,
dept_id#x]
+
+
+-- !query
+SELECT *
+FROM emp
+WHERE EXISTS (SELECT state, max(dept.dept_name)
+ FROM dept
+ WHERE emp.salary > 200
+ GROUP BY state
+ LIMIT 1)
+-- !query analysis
+Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
++- Filter exists#x [salary#x]
+ : +- GlobalLimit 1
+ : +- LocalLimit 1
+ : +- Aggregate [state#x], [state#x, max(dept_name#x) AS
max(dept_name)#x]
+ : +- Filter (outer(salary#x) > cast(200 as double))
+ : +- SubqueryAlias dept
+ : +- View (`DEPT`, [dept_id#x, dept_name#x, state#x])
+ : +- Project [cast(dept_id#x as int) AS dept_id#x,
cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) AS state#x]
+ : +- Project [dept_id#x, dept_name#x, state#x]
+ : +- SubqueryAlias DEPT
+ : +- LocalRelation [dept_id#x, dept_name#x,
state#x]
+ +- SubqueryAlias emp
+ +- View (`EMP`, [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x])
+ +- Project [cast(id#x as int) AS id#x, cast(emp_name#x as string) AS
emp_name#x, cast(hiredate#x as date) AS hiredate#x, cast(salary#x as double) AS
salary#x, cast(dept_id#x as int) AS dept_id#x]
+ +- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
+ +- SubqueryAlias EMP
+ +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x,
dept_id#x]
+
+
-- !query
SELECT *
FROM emp
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-limit.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-limit.sql.out
index a35c281c3908..d19155916f4a 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-limit.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-limit.sql.out
@@ -889,6 +889,91 @@ Offset 1
+- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x,
t1f#x, t1g#x, t1h#x, t1i#x]
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT t2d
+ FROM t2
+ WHERE t1a IS NOT NULL
+ LIMIT 10)
+-- !query analysis
+Aggregate [count(distinct t1a#x) AS count(DISTINCT t1a)#xL]
++- Filter t1d#xL IN (list#x [t1a#x])
+ : +- GlobalLimit 10
+ : +- LocalLimit 10
+ : +- Project [t2d#xL]
+ : +- Filter isnotnull(outer(t1a#x))
+ : +- SubqueryAlias t2
+ : +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x,
t2f#x, t2g#x, t2h#x, t2i#x])
+ : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x
as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS
t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x,
cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x,
cast(t2i#x as date) AS t2i#x]
+ : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x,
t2f#x, t2g#x, t2h#x, t2i#x]
+ : +- SubqueryAlias t2
+ : +- LocalRelation [t2a#x, t2b#x, t2c#x,
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+ +- SubqueryAlias t1
+ +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x,
t1i#x])
+ +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint)
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL,
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date)
AS t1i#x]
+ +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x,
t1h#x, t1i#x]
+ +- SubqueryAlias t1
+ +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x,
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT MAX(t2d)
+ FROM t2
+ WHERE t1a IS NOT NULL
+ LIMIT 10)
+-- !query analysis
+Aggregate [count(distinct t1a#x) AS count(DISTINCT t1a)#xL]
++- Filter t1d#xL IN (list#x [t1a#x])
+ : +- GlobalLimit 10
+ : +- LocalLimit 10
+ : +- Aggregate [max(t2d#xL) AS max(t2d)#xL]
+ : +- Filter isnotnull(outer(t1a#x))
+ : +- SubqueryAlias t2
+ : +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x,
t2f#x, t2g#x, t2h#x, t2i#x])
+ : +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x
as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS
t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x,
cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x,
cast(t2i#x as date) AS t2i#x]
+ : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x,
t2f#x, t2g#x, t2h#x, t2i#x]
+ : +- SubqueryAlias t2
+ : +- LocalRelation [t2a#x, t2b#x, t2c#x,
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+ +- SubqueryAlias t1
+ +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x,
t1i#x])
+ +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint)
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL,
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date)
AS t1i#x]
+ +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x,
t1h#x, t1i#x]
+ +- SubqueryAlias t1
+ +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x,
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT DISTINCT t2d
+ FROM t2
+ WHERE t1a IS NOT NULL
+ LIMIT 10)
+-- !query analysis
+Aggregate [count(distinct t1a#x) AS count(DISTINCT t1a)#xL]
++- Filter t1d#xL IN (list#x [t1a#x])
+ : +- GlobalLimit 10
+ : +- LocalLimit 10
+ : +- Distinct
+ : +- Project [t2d#xL]
+ : +- Filter isnotnull(outer(t1a#x))
+ : +- SubqueryAlias t2
+ : +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x,
t2f#x, t2g#x, t2h#x, t2i#x])
+ : +- Project [cast(t2a#x as string) AS t2a#x,
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x,
cast(t2i#x as date) AS t2i#x]
+ : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x,
t2f#x, t2g#x, t2h#x, t2i#x]
+ : +- SubqueryAlias t2
+ : +- LocalRelation [t2a#x, t2b#x, t2c#x,
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+ +- SubqueryAlias t1
+ +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x,
t1i#x])
+ +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint)
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL,
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date)
AS t1i#x]
+ +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x,
t1h#x, t1i#x]
+ +- SubqueryAlias t1
+ +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x,
t1g#x, t1h#x, t1i#x]
+
+
-- !query
set spark.sql.optimizer.decorrelateExistsIn.enabled = false
-- !query analysis
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
index c6fc5fed694c..fd1acb113e3a 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
@@ -1162,6 +1162,95 @@ Project [t1a#x, t1b#x]
+- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x,
t1g#x, t1h#x, t1i#x]
+-- !query
+SELECT t1a, t1b
+FROM t1
+WHERE t1c = (SELECT t2c
+ FROM t2
+ WHERE t1b < t1d
+ ORDER BY t2c LIMIT 1)
+-- !query analysis
+Project [t1a#x, t1b#x]
++- Filter (t1c#x = scalar-subquery#x [t1b#x && t1d#xL])
+ : +- GlobalLimit 1
+ : +- LocalLimit 1
+ : +- Sort [t2c#x ASC NULLS FIRST], true
+ : +- Project [t2c#x]
+ : +- Filter (cast(outer(t1b#x) as bigint) < outer(t1d#xL))
+ : +- SubqueryAlias t2
+ : +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x,
t2f#x, t2g#x, t2h#x, t2i#x])
+ : +- Project [cast(t2a#x as string) AS t2a#x,
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x,
cast(t2i#x as date) AS t2i#x]
+ : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x,
t2f#x, t2g#x, t2h#x, t2i#x]
+ : +- SubqueryAlias t2
+ : +- LocalRelation [t2a#x, t2b#x, t2c#x,
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+ +- SubqueryAlias t1
+ +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x,
t1i#x])
+ +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint)
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL,
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date)
AS t1i#x]
+ +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x,
t1h#x, t1i#x]
+ +- SubqueryAlias t1
+ +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x,
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT t1a, t1b
+FROM t1
+WHERE t1c = (SELECT MAX(t2c)
+ FROM t2
+ WHERE t1b < t1d
+ ORDER BY min(t2c) LIMIT 1)
+-- !query analysis
+Project [t1a#x, t1b#x]
++- Filter (t1c#x = scalar-subquery#x [t1b#x && t1d#xL])
+ : +- GlobalLimit 1
+ : +- LocalLimit 1
+ : +- Project [max(t2c)#x]
+ : +- Sort [min(t2c#x)#x ASC NULLS FIRST], true
+ : +- Aggregate [max(t2c#x) AS max(t2c)#x, min(t2c#x) AS
min(t2c#x)#x]
+ : +- Filter (cast(outer(t1b#x) as bigint) < outer(t1d#xL))
+ : +- SubqueryAlias t2
+ : +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x,
t2f#x, t2g#x, t2h#x, t2i#x])
+ : +- Project [cast(t2a#x as string) AS t2a#x,
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x,
cast(t2i#x as date) AS t2i#x]
+ : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL,
t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+ : +- SubqueryAlias t2
+ : +- LocalRelation [t2a#x, t2b#x, t2c#x,
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+ +- SubqueryAlias t1
+ +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x,
t1i#x])
+ +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint)
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL,
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date)
AS t1i#x]
+ +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x,
t1h#x, t1i#x]
+ +- SubqueryAlias t1
+ +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x,
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT t1a, t1b
+FROM t1
+WHERE t1c = (SELECT DISTINCT t2c
+ FROM t2
+ WHERE t1b < t1d
+ ORDER BY t2c LIMIT 1)
+-- !query analysis
+Project [t1a#x, t1b#x]
++- Filter (t1c#x = scalar-subquery#x [t1b#x && t1d#xL])
+ : +- GlobalLimit 1
+ : +- LocalLimit 1
+ : +- Sort [t2c#x ASC NULLS FIRST], true
+ : +- Distinct
+ : +- Project [t2c#x]
+ : +- Filter (cast(outer(t1b#x) as bigint) < outer(t1d#xL))
+ : +- SubqueryAlias t2
+ : +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x,
t2f#x, t2g#x, t2h#x, t2i#x])
+ : +- Project [cast(t2a#x as string) AS t2a#x,
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x,
cast(t2i#x as date) AS t2i#x]
+ : +- Project [t2a#x, t2b#x, t2c#x, t2d#xL,
t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+ : +- SubqueryAlias t2
+ : +- LocalRelation [t2a#x, t2b#x, t2c#x,
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+ +- SubqueryAlias t1
+ +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x,
t1i#x])
+ +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint)
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL,
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date)
AS t1i#x]
+ +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x,
t1h#x, t1i#x]
+ +- SubqueryAlias t1
+ +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x,
t1g#x, t1h#x, t1i#x]
+
+
-- !query
CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
-- !query analysis
diff --git
a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql
b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql
index 2e3055db60af..9ff3409b21a3 100644
---
a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql
+++
b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql
@@ -204,6 +204,24 @@ WHERE EXISTS (SELECT max(dept.dept_id)
GROUP BY state
LIMIT 1);
+-- SPARK-46526: LIMIT over correlated predicate that references only the outer
table.
+SELECT *
+FROM emp
+WHERE EXISTS (SELECT max(dept.dept_id)
+ FROM dept
+ WHERE emp.salary > 200
+ LIMIT 1);
+
+-- SPARK-46526: LIMIT over correlated predicate that references only the outer
table,
+-- and a group by.
+SELECT *
+FROM emp
+WHERE EXISTS (SELECT state, max(dept.dept_name)
+ FROM dept
+ WHERE emp.salary > 200
+ GROUP BY state
+ LIMIT 1);
+
-- limit and offset in the not exists subquery block.
-- TC.03.03
SELECT *
diff --git
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
index a76da33e501c..e2f2e05d9b85 100644
---
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
+++
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
@@ -308,6 +308,27 @@ GROUP BY t1b
ORDER BY t1b NULLS last
OFFSET 1;
+-- SPARK-46526: LIMIT over correlated predicate that references only the outer
table.
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT t2d
+ FROM t2
+ WHERE t1a IS NOT NULL
+ LIMIT 10);
+
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT MAX(t2d)
+ FROM t2
+ WHERE t1a IS NOT NULL
+ LIMIT 10);
+
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT DISTINCT t2d
+ FROM t2
+ WHERE t1a IS NOT NULL
+ LIMIT 10);
set spark.sql.optimizer.decorrelateExistsIn.enabled = false;
-- LIMIT is not supported in correlated IN, unless the
DECORRELATE_EXISTS_AND_IN_SUBQUERIES
diff --git
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
index 20ba10176196..902ae2c807c3 100644
---
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
+++
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
@@ -351,6 +351,28 @@ WHERE t1c = (SELECT t2c
WHERE t2c = t1c
ORDER BY t2c LIMIT 1);
+-- SPARK-46526: LIMIT over correlated predicate that references only the outer
table.
+SELECT t1a, t1b
+FROM t1
+WHERE t1c = (SELECT t2c
+ FROM t2
+ WHERE t1b < t1d
+ ORDER BY t2c LIMIT 1);
+
+SELECT t1a, t1b
+FROM t1
+WHERE t1c = (SELECT MAX(t2c)
+ FROM t2
+ WHERE t1b < t1d
+ ORDER BY min(t2c) LIMIT 1);
+
+SELECT t1a, t1b
+FROM t1
+WHERE t1c = (SELECT DISTINCT t2c
+ FROM t2
+ WHERE t1b < t1d
+ ORDER BY t2c LIMIT 1);
+
-- Set operations in correlation path
CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0);
diff --git
a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out
b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out
index 6c992901712e..2440b08780bb 100644
---
a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out
+++
b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out
@@ -406,6 +406,45 @@
struct<id:int,emp_name:string,hiredate:date,salary:double,dept_id:int>
800 emp 8 2016-01-01 150.0 70
+-- !query
+SELECT *
+FROM emp
+WHERE EXISTS (SELECT max(dept.dept_id)
+ FROM dept
+ WHERE emp.salary > 200
+ LIMIT 1)
+-- !query schema
+struct<id:int,emp_name:string,hiredate:date,salary:double,dept_id:int>
+-- !query output
+100 emp 1 2005-01-01 100.0 10
+100 emp 1 2005-01-01 100.0 10
+200 emp 2 2003-01-01 200.0 10
+300 emp 3 2002-01-01 300.0 20
+400 emp 4 2005-01-01 400.0 30
+500 emp 5 2001-01-01 400.0 NULL
+600 emp 6 - no dept 2001-01-01 400.0 100
+700 emp 7 2010-01-01 400.0 100
+800 emp 8 2016-01-01 150.0 70
+
+
+-- !query
+SELECT *
+FROM emp
+WHERE EXISTS (SELECT state, max(dept.dept_name)
+ FROM dept
+ WHERE emp.salary > 200
+ GROUP BY state
+ LIMIT 1)
+-- !query schema
+struct<id:int,emp_name:string,hiredate:date,salary:double,dept_id:int>
+-- !query output
+300 emp 3 2002-01-01 300.0 20
+400 emp 4 2005-01-01 400.0 30
+500 emp 5 2001-01-01 400.0 NULL
+600 emp 6 - no dept 2001-01-01 400.0 100
+700 emp 7 2010-01-01 400.0 100
+
+
-- !query
SELECT *
FROM emp
diff --git
a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
index 42dd330fbaa9..1bc7dea188d1 100644
---
a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
+++
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
@@ -543,6 +543,45 @@ struct<count(DISTINCT t1a):bigint,t1b:smallint>
1 NULL
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT t2d
+ FROM t2
+ WHERE t1a IS NOT NULL
+ LIMIT 10)
+-- !query schema
+struct<count(DISTINCT t1a):bigint>
+-- !query output
+4
+
+
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT MAX(t2d)
+ FROM t2
+ WHERE t1a IS NOT NULL
+ LIMIT 10)
+-- !query schema
+struct<count(DISTINCT t1a):bigint>
+-- !query output
+0
+
+
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT DISTINCT t2d
+ FROM t2
+ WHERE t1a IS NOT NULL
+ LIMIT 10)
+-- !query schema
+struct<count(DISTINCT t1a):bigint>
+-- !query output
+4
+
+
-- !query
set spark.sql.optimizer.decorrelateExistsIn.enabled = false
-- !query schema
diff --git
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
index e714ec0bad0b..e85fed2417ef 100644
---
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
+++
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
@@ -588,6 +588,46 @@ val1d NULL
val1d NULL
+-- !query
+SELECT t1a, t1b
+FROM t1
+WHERE t1c = (SELECT t2c
+ FROM t2
+ WHERE t1b < t1d
+ ORDER BY t2c LIMIT 1)
+-- !query schema
+struct<t1a:string,t1b:smallint>
+-- !query output
+val1a 16
+
+
+-- !query
+SELECT t1a, t1b
+FROM t1
+WHERE t1c = (SELECT MAX(t2c)
+ FROM t2
+ WHERE t1b < t1d
+ ORDER BY min(t2c) LIMIT 1)
+-- !query schema
+struct<t1a:string,t1b:smallint>
+-- !query output
+val1b 8
+val1c 8
+
+
+-- !query
+SELECT t1a, t1b
+FROM t1
+WHERE t1c = (SELECT DISTINCT t2c
+ FROM t2
+ WHERE t1b < t1d
+ ORDER BY t2c LIMIT 1)
+-- !query schema
+struct<t1a:string,t1b:smallint>
+-- !query output
+val1a 16
+
+
-- !query
CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
-- !query schema
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]