This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dc73a8d7e96e [SPARK-46526][SQL] Support LIMIT over correlated 
subqueries where predicates only reference outer table
dc73a8d7e96e is described below

commit dc73a8d7e96ead55053096971c838908b7c90527
Author: Andrey Gubichev <[email protected]>
AuthorDate: Wed Feb 7 11:54:21 2024 +0800

    [SPARK-46526][SQL] Support LIMIT over correlated subqueries where 
predicates only reference outer table
    
    ### What changes were proposed in this pull request?
    
    The type of query that this PR addresses is the following:
    
    ```
    SELECT COUNT(DISTINCT(t1a))
    FROM t1
    WHERE t1d IN (SELECT t2d
                  FROM   t2
                  WHERE t1a IS NOT NULL
                  LIMIT 10);
    ```
    
    Here, the predicate in the subquery `t1a IS NOT NULL` does not reference 
the inner table at all, so our standard decorrelation technique of "compute 10 
values of t2d per every value of the inner table" does not work. In fact, such 
predicates can be lifted above the limit 10. This PR achieves exactly that.
    
    ### Why are the changes needed?
    
    Fixed the bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Some broken queries are now working.
    
    ### How was this patch tested?
    
    Query tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44514 from agubichev/SPARK-46526_limit_corr.
    
    Authored-by: Andrey Gubichev <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../catalyst/optimizer/DecorrelateInnerQuery.scala | 34 +++++----
 .../exists-subquery/exists-orderby-limit.sql.out   | 57 ++++++++++++++
 .../subquery/in-subquery/in-limit.sql.out          | 85 +++++++++++++++++++++
 .../scalar-subquery-predicate.sql.out              | 89 ++++++++++++++++++++++
 .../exists-subquery/exists-orderby-limit.sql       | 18 +++++
 .../inputs/subquery/in-subquery/in-limit.sql       | 21 +++++
 .../scalar-subquery/scalar-subquery-predicate.sql  | 22 ++++++
 .../exists-subquery/exists-orderby-limit.sql.out   | 39 ++++++++++
 .../results/subquery/in-subquery/in-limit.sql.out  | 39 ++++++++++
 .../scalar-subquery-predicate.sql.out              | 40 ++++++++++
 10 files changed, 431 insertions(+), 13 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index eca392fd84ca..1ebf0c7b39a4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -673,20 +673,28 @@ object DecorrelateInnerQuery extends PredicateHelper {
               decorrelate(child, parentOuterReferences, aggregated = true, 
underSetOp)
             val collectedChildOuterReferences = 
collectOuterReferencesInPlanTree(child)
             // Add outer references to the PARTITION BY clause
-            val partitionFields = 
collectedChildOuterReferences.map(outerReferenceMap(_)).toSeq
-            val orderByFields = replaceOuterReferences(ordering, 
outerReferenceMap)
+            val partitionFields = collectedChildOuterReferences
+              .filter(outerReferenceMap.contains(_))
+              .map(outerReferenceMap(_)).toSeq
+            if (partitionFields.isEmpty) {
+              // Underlying subquery has no predicates connecting inner and 
outer query.
+              // In this case, limit can be computed over the inner query 
directly.
+              (Limit(limit, newChild), joinCond, outerReferenceMap)
+            } else {
+              val orderByFields = replaceOuterReferences(ordering, 
outerReferenceMap)
 
-            val rowNumber = WindowExpression(RowNumber(),
-              WindowSpecDefinition(partitionFields, orderByFields,
-                SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow)))
-            val rowNumberAlias = Alias(rowNumber, "rn")()
-            // Window function computes row_number() when partitioning by 
correlated references,
-            // and projects all the other fields from the input.
-            val window = Window(Seq(rowNumberAlias),
-              partitionFields, orderByFields, newChild)
-            val filter = Filter(LessThanOrEqual(rowNumberAlias.toAttribute, 
limit), window)
-            val project = Project(newChild.output, filter)
-            (project, joinCond, outerReferenceMap)
+              val rowNumber = WindowExpression(RowNumber(),
+                WindowSpecDefinition(partitionFields, orderByFields,
+                  SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow)))
+              val rowNumberAlias = Alias(rowNumber, "rn")()
+              // Window function computes row_number() when partitioning by 
correlated references,
+              // and projects all the other fields from the input.
+              val window = Window(Seq(rowNumberAlias),
+                partitionFields, orderByFields, newChild)
+              val filter = Filter(LessThanOrEqual(rowNumberAlias.toAttribute, 
limit), window)
+              val project = Project(newChild.output, filter)
+              (project, joinCond, outerReferenceMap)
+            }
 
           case w @ Window(projectList, partitionSpec, orderSpec, child) =>
             val outerReferences = collectOuterReferences(w.expressions)
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-orderby-limit.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-orderby-limit.sql.out
index 0694e8ae52a7..4bfb6c5b843c 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-orderby-limit.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-orderby-limit.sql.out
@@ -615,6 +615,63 @@ Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
                   +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, 
dept_id#x]
 
 
+-- !query
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT max(dept.dept_id)
+               FROM   dept
+               WHERE  emp.salary > 200
+               LIMIT  1)
+-- !query analysis
+Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
++- Filter exists#x [salary#x]
+   :  +- GlobalLimit 1
+   :     +- LocalLimit 1
+   :        +- Aggregate [max(dept_id#x) AS max(dept_id)#x]
+   :           +- Filter (outer(salary#x) > cast(200 as double))
+   :              +- SubqueryAlias dept
+   :                 +- View (`DEPT`, [dept_id#x, dept_name#x, state#x])
+   :                    +- Project [cast(dept_id#x as int) AS dept_id#x, 
cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) AS state#x]
+   :                       +- Project [dept_id#x, dept_name#x, state#x]
+   :                          +- SubqueryAlias DEPT
+   :                             +- LocalRelation [dept_id#x, dept_name#x, 
state#x]
+   +- SubqueryAlias emp
+      +- View (`EMP`, [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x])
+         +- Project [cast(id#x as int) AS id#x, cast(emp_name#x as string) AS 
emp_name#x, cast(hiredate#x as date) AS hiredate#x, cast(salary#x as double) AS 
salary#x, cast(dept_id#x as int) AS dept_id#x]
+            +- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
+               +- SubqueryAlias EMP
+                  +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, 
dept_id#x]
+
+
+-- !query
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT state, max(dept.dept_name)
+               FROM   dept
+               WHERE  emp.salary > 200
+               GROUP BY state
+               LIMIT  1)
+-- !query analysis
+Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
++- Filter exists#x [salary#x]
+   :  +- GlobalLimit 1
+   :     +- LocalLimit 1
+   :        +- Aggregate [state#x], [state#x, max(dept_name#x) AS 
max(dept_name)#x]
+   :           +- Filter (outer(salary#x) > cast(200 as double))
+   :              +- SubqueryAlias dept
+   :                 +- View (`DEPT`, [dept_id#x, dept_name#x, state#x])
+   :                    +- Project [cast(dept_id#x as int) AS dept_id#x, 
cast(dept_name#x as string) AS dept_name#x, cast(state#x as string) AS state#x]
+   :                       +- Project [dept_id#x, dept_name#x, state#x]
+   :                          +- SubqueryAlias DEPT
+   :                             +- LocalRelation [dept_id#x, dept_name#x, 
state#x]
+   +- SubqueryAlias emp
+      +- View (`EMP`, [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x])
+         +- Project [cast(id#x as int) AS id#x, cast(emp_name#x as string) AS 
emp_name#x, cast(hiredate#x as date) AS hiredate#x, cast(salary#x as double) AS 
salary#x, cast(dept_id#x as int) AS dept_id#x]
+            +- Project [id#x, emp_name#x, hiredate#x, salary#x, dept_id#x]
+               +- SubqueryAlias EMP
+                  +- LocalRelation [id#x, emp_name#x, hiredate#x, salary#x, 
dept_id#x]
+
+
 -- !query
 SELECT *
 FROM   emp
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-limit.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-limit.sql.out
index a35c281c3908..d19155916f4a 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-limit.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-limit.sql.out
@@ -889,6 +889,91 @@ Offset 1
                         +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, 
t1f#x, t1g#x, t1h#x, t1i#x]
 
 
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT t2d
+              FROM   t2
+              WHERE t1a IS NOT NULL
+              LIMIT 10)
+-- !query analysis
+Aggregate [count(distinct t1a#x) AS count(DISTINCT t1a)#xL]
++- Filter t1d#xL IN (list#x [t1a#x])
+   :  +- GlobalLimit 10
+   :     +- LocalLimit 10
+   :        +- Project [t2d#xL]
+   :           +- Filter isnotnull(outer(t1a#x))
+   :              +- SubqueryAlias t2
+   :                 +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x])
+   :                    +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x 
as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS 
t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, 
cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                       +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x]
+   :                          +- SubqueryAlias t2
+   :                             +- LocalRelation [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, 
t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT MAX(t2d)
+              FROM   t2
+              WHERE t1a IS NOT NULL
+              LIMIT 10)
+-- !query analysis
+Aggregate [count(distinct t1a#x) AS count(DISTINCT t1a)#xL]
++- Filter t1d#xL IN (list#x [t1a#x])
+   :  +- GlobalLimit 10
+   :     +- LocalLimit 10
+   :        +- Aggregate [max(t2d#xL) AS max(t2d)#xL]
+   :           +- Filter isnotnull(outer(t1a#x))
+   :              +- SubqueryAlias t2
+   :                 +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x])
+   :                    +- Project [cast(t2a#x as string) AS t2a#x, cast(t2b#x 
as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as bigint) AS 
t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS t2f#x, 
cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                       +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x]
+   :                          +- SubqueryAlias t2
+   :                             +- LocalRelation [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, 
t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT DISTINCT t2d
+              FROM   t2
+              WHERE t1a IS NOT NULL
+              LIMIT 10)
+-- !query analysis
+Aggregate [count(distinct t1a#x) AS count(DISTINCT t1a)#xL]
++- Filter t1d#xL IN (list#x [t1a#x])
+   :  +- GlobalLimit 10
+   :     +- LocalLimit 10
+   :        +- Distinct
+   :           +- Project [t2d#xL]
+   :              +- Filter isnotnull(outer(t1a#x))
+   :                 +- SubqueryAlias t2
+   :                    +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x])
+   :                       +- Project [cast(t2a#x as string) AS t2a#x, 
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as 
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS 
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                          +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x]
+   :                             +- SubqueryAlias t2
+   :                                +- LocalRelation [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, 
t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
 -- !query
 set spark.sql.optimizer.decorrelateExistsIn.enabled = false
 -- !query analysis
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
index c6fc5fed694c..fd1acb113e3a 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
@@ -1162,6 +1162,95 @@ Project [t1a#x, t1b#x]
                   +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
 
 
+-- !query
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT t2c
+              FROM   t2
+              WHERE  t1b < t1d
+              ORDER BY t2c LIMIT 1)
+-- !query analysis
+Project [t1a#x, t1b#x]
++- Filter (t1c#x = scalar-subquery#x [t1b#x && t1d#xL])
+   :  +- GlobalLimit 1
+   :     +- LocalLimit 1
+   :        +- Sort [t2c#x ASC NULLS FIRST], true
+   :           +- Project [t2c#x]
+   :              +- Filter (cast(outer(t1b#x) as bigint) < outer(t1d#xL))
+   :                 +- SubqueryAlias t2
+   :                    +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x])
+   :                       +- Project [cast(t2a#x as string) AS t2a#x, 
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as 
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS 
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                          +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x]
+   :                             +- SubqueryAlias t2
+   :                                +- LocalRelation [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, 
t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT MAX(t2c)
+              FROM   t2
+              WHERE  t1b < t1d
+              ORDER BY min(t2c) LIMIT 1)
+-- !query analysis
+Project [t1a#x, t1b#x]
++- Filter (t1c#x = scalar-subquery#x [t1b#x && t1d#xL])
+   :  +- GlobalLimit 1
+   :     +- LocalLimit 1
+   :        +- Project [max(t2c)#x]
+   :           +- Sort [min(t2c#x)#x ASC NULLS FIRST], true
+   :              +- Aggregate [max(t2c#x) AS max(t2c)#x, min(t2c#x) AS 
min(t2c#x)#x]
+   :                 +- Filter (cast(outer(t1b#x) as bigint) < outer(t1d#xL))
+   :                    +- SubqueryAlias t2
+   :                       +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x])
+   :                          +- Project [cast(t2a#x as string) AS t2a#x, 
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as 
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS 
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                             +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, 
t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   :                                +- SubqueryAlias t2
+   :                                   +- LocalRelation [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, 
t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT DISTINCT t2c
+              FROM   t2
+              WHERE  t1b < t1d
+              ORDER BY t2c LIMIT 1)
+-- !query analysis
+Project [t1a#x, t1b#x]
++- Filter (t1c#x = scalar-subquery#x [t1b#x && t1d#xL])
+   :  +- GlobalLimit 1
+   :     +- LocalLimit 1
+   :        +- Sort [t2c#x ASC NULLS FIRST], true
+   :           +- Distinct
+   :              +- Project [t2c#x]
+   :                 +- Filter (cast(outer(t1b#x) as bigint) < outer(t1d#xL))
+   :                    +- SubqueryAlias t2
+   :                       +- View (`t2`, [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x])
+   :                          +- Project [cast(t2a#x as string) AS t2a#x, 
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as 
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS 
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                             +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, 
t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   :                                +- SubqueryAlias t2
+   :                                   +- LocalRelation [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, t1h#x, 
t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
 -- !query
 CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
 -- !query analysis
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql
index 2e3055db60af..9ff3409b21a3 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-orderby-limit.sql
@@ -204,6 +204,24 @@ WHERE  EXISTS (SELECT max(dept.dept_id)
                GROUP  BY state
                LIMIT  1);
 
+-- SPARK-46526: LIMIT over correlated predicate that references only the outer 
table.
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT max(dept.dept_id)
+               FROM   dept
+               WHERE  emp.salary > 200
+               LIMIT  1);
+
+-- SPARK-46526: LIMIT over correlated predicate that references only the outer 
table,
+-- and a group by.
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT state, max(dept.dept_name)
+               FROM   dept
+               WHERE  emp.salary > 200
+               GROUP BY state
+               LIMIT  1);
+
 -- limit and offset in the not exists subquery block.
 -- TC.03.03
 SELECT *
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
index a76da33e501c..e2f2e05d9b85 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-limit.sql
@@ -308,6 +308,27 @@ GROUP  BY t1b
 ORDER BY t1b NULLS last
 OFFSET 1;
 
+-- SPARK-46526: LIMIT over correlated predicate that references only the outer 
table.
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT t2d
+              FROM   t2
+              WHERE t1a IS NOT NULL
+              LIMIT 10);
+
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT MAX(t2d)
+              FROM   t2
+              WHERE t1a IS NOT NULL
+              LIMIT 10);
+
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT DISTINCT t2d
+              FROM   t2
+              WHERE t1a IS NOT NULL
+              LIMIT 10);
 
 set spark.sql.optimizer.decorrelateExistsIn.enabled = false;
 -- LIMIT is not supported in correlated IN, unless the 
DECORRELATE_EXISTS_AND_IN_SUBQUERIES
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
index 20ba10176196..902ae2c807c3 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
@@ -351,6 +351,28 @@ WHERE  t1c = (SELECT t2c
               WHERE  t2c = t1c
               ORDER BY t2c LIMIT 1);
 
+-- SPARK-46526: LIMIT over correlated predicate that references only the outer 
table.
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT t2c
+              FROM   t2
+              WHERE  t1b < t1d
+              ORDER BY t2c LIMIT 1);
+
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT MAX(t2c)
+              FROM   t2
+              WHERE  t1b < t1d
+              ORDER BY min(t2c) LIMIT 1);
+
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT DISTINCT t2c
+              FROM   t2
+              WHERE  t1b < t1d
+              ORDER BY t2c LIMIT 1);
+
 -- Set operations in correlation path
 
 CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0);
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out
index 6c992901712e..2440b08780bb 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-orderby-limit.sql.out
@@ -406,6 +406,45 @@ 
struct<id:int,emp_name:string,hiredate:date,salary:double,dept_id:int>
 800    emp 8   2016-01-01      150.0   70
 
 
+-- !query
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT max(dept.dept_id)
+               FROM   dept
+               WHERE  emp.salary > 200
+               LIMIT  1)
+-- !query schema
+struct<id:int,emp_name:string,hiredate:date,salary:double,dept_id:int>
+-- !query output
+100    emp 1   2005-01-01      100.0   10
+100    emp 1   2005-01-01      100.0   10
+200    emp 2   2003-01-01      200.0   10
+300    emp 3   2002-01-01      300.0   20
+400    emp 4   2005-01-01      400.0   30
+500    emp 5   2001-01-01      400.0   NULL
+600    emp 6 - no dept 2001-01-01      400.0   100
+700    emp 7   2010-01-01      400.0   100
+800    emp 8   2016-01-01      150.0   70
+
+
+-- !query
+SELECT *
+FROM   emp
+WHERE  EXISTS (SELECT state, max(dept.dept_name)
+               FROM   dept
+               WHERE  emp.salary > 200
+               GROUP BY state
+               LIMIT  1)
+-- !query schema
+struct<id:int,emp_name:string,hiredate:date,salary:double,dept_id:int>
+-- !query output
+300    emp 3   2002-01-01      300.0   20
+400    emp 4   2005-01-01      400.0   30
+500    emp 5   2001-01-01      400.0   NULL
+600    emp 6 - no dept 2001-01-01      400.0   100
+700    emp 7   2010-01-01      400.0   100
+
+
 -- !query
 SELECT *
 FROM   emp
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
index 42dd330fbaa9..1bc7dea188d1 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-limit.sql.out
@@ -543,6 +543,45 @@ struct<count(DISTINCT t1a):bigint,t1b:smallint>
 1      NULL
 
 
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT t2d
+              FROM   t2
+              WHERE t1a IS NOT NULL
+              LIMIT 10)
+-- !query schema
+struct<count(DISTINCT t1a):bigint>
+-- !query output
+4
+
+
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT MAX(t2d)
+              FROM   t2
+              WHERE t1a IS NOT NULL
+              LIMIT 10)
+-- !query schema
+struct<count(DISTINCT t1a):bigint>
+-- !query output
+0
+
+
+-- !query
+SELECT COUNT(DISTINCT(t1a))
+FROM t1
+WHERE t1d IN (SELECT DISTINCT t2d
+              FROM   t2
+              WHERE t1a IS NOT NULL
+              LIMIT 10)
+-- !query schema
+struct<count(DISTINCT t1a):bigint>
+-- !query output
+4
+
+
 -- !query
 set spark.sql.optimizer.decorrelateExistsIn.enabled = false
 -- !query schema
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
index e714ec0bad0b..e85fed2417ef 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
@@ -588,6 +588,46 @@ val1d      NULL
 val1d  NULL
 
 
+-- !query
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT t2c
+              FROM   t2
+              WHERE  t1b < t1d
+              ORDER BY t2c LIMIT 1)
+-- !query schema
+struct<t1a:string,t1b:smallint>
+-- !query output
+val1a  16
+
+
+-- !query
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT MAX(t2c)
+              FROM   t2
+              WHERE  t1b < t1d
+              ORDER BY min(t2c) LIMIT 1)
+-- !query schema
+struct<t1a:string,t1b:smallint>
+-- !query output
+val1b  8
+val1c  8
+
+
+-- !query
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT DISTINCT t2c
+              FROM   t2
+              WHERE  t1b < t1d
+              ORDER BY t2c LIMIT 1)
+-- !query schema
+struct<t1a:string,t1b:smallint>
+-- !query output
+val1a  16
+
+
 -- !query
 CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
 -- !query schema


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to