This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fd915ffaba [SPARK-45584][SQL] Fix subquery execution failure with 
TakeOrderedAndProjectExec
8fd915ffaba is described below

commit 8fd915ffaba1cc99813cc8d6d2a28688d7fae39b
Author: allisonwang-db <allison.w...@databricks.com>
AuthorDate: Fri Oct 20 08:36:42 2023 +0800

    [SPARK-45584][SQL] Fix subquery execution failure with 
TakeOrderedAndProjectExec
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a bug when there are subqueries in 
`TakeOrderedAndProjectExec`. The executeCollect method does not wait for 
subqueries to finish and it can result in IllegalArgumentException when 
executing a simple query.
    For example this query:
    ```
    WITH t2 AS (
      SELECT * FROM t1 ORDER BY id
    )
    SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
    ```
    will fail with this error
    ```
     java.lang.IllegalArgumentException: requirement failed: Subquery 
subquery#242, [id=#109] has not finished
    ```
    
    ### Why are the changes needed?
    
    To fix a bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43419 from allisonwang-db/spark-45584-subquery-failure.
    
    Authored-by: allisonwang-db <allison.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/execution/limit.scala     |  2 +-
 .../scala/org/apache/spark/sql/SubquerySuite.scala | 24 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 877f6508d96..77135d21a26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -282,7 +282,7 @@ case class TakeOrderedAndProjectExec(
     projectList.map(_.toAttribute)
   }
 
-  override def executeCollect(): Array[InternalRow] = {
+  override def executeCollect(): Array[InternalRow] = executeQuery {
     val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, 
sortOrder)
     val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
     val limited = if (orderingSatisfies) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 37fea85fcd2..4fab10c3d0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -2711,4 +2711,28 @@ class SubquerySuite extends QueryTest
       checkAnswer(df, Row(1, "foo", 1, "foo"))
     }
   }
+
+  test("SPARK-45584: subquery execution should not fail with ORDER BY and 
LIMIT") {
+    withTable("t1") {
+      sql(
+        """
+          |CREATE TABLE t1 USING PARQUET
+          |AS SELECT * FROM VALUES
+          |(1, "a"),
+          |(2, "a"),
+          |(3, "a") t(id, value)
+          |""".stripMargin)
+      val df = sql(
+        """
+          |WITH t2 AS (
+          |  SELECT * FROM t1 ORDER BY id
+          |)
+          |SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10
+          |""".stripMargin)
+      // This should not fail with IllegalArgumentException.
+      checkAnswer(
+        df,
+        Row(1, "a", 3) :: Row(2, "a", 3) :: Row(3, "a", 3) :: Nil)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to