This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8fd915ffaba [SPARK-45584][SQL] Fix subquery execution failure with TakeOrderedAndProjectExec 8fd915ffaba is described below commit 8fd915ffaba1cc99813cc8d6d2a28688d7fae39b Author: allisonwang-db <allison.w...@databricks.com> AuthorDate: Fri Oct 20 08:36:42 2023 +0800 [SPARK-45584][SQL] Fix subquery execution failure with TakeOrderedAndProjectExec ### What changes were proposed in this pull request? This PR fixes a bug when there are subqueries in `TakeOrderedAndProjectExec`. The executeCollect method does not wait for subqueries to finish and it can result in IllegalArgumentException when executing a simple query. For example this query: ``` WITH t2 AS ( SELECT * FROM t1 ORDER BY id ) SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10 ``` will fail with this error ``` java.lang.IllegalArgumentException: requirement failed: Subquery subquery#242, [id=#109] has not finished ``` ### Why are the changes needed? To fix a bug. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #43419 from allisonwang-db/spark-45584-subquery-failure. Authored-by: allisonwang-db <allison.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/execution/limit.scala | 2 +- .../scala/org/apache/spark/sql/SubquerySuite.scala | 24 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 877f6508d96..77135d21a26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -282,7 +282,7 @@ case class TakeOrderedAndProjectExec( projectList.map(_.toAttribute) } - override def executeCollect(): Array[InternalRow] = { + override def executeCollect(): Array[InternalRow] = executeQuery { val orderingSatisfies = SortOrder.orderingSatisfies(child.outputOrdering, sortOrder) val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val limited = if (orderingSatisfies) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 37fea85fcd2..4fab10c3d0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2711,4 +2711,28 @@ class SubquerySuite extends QueryTest checkAnswer(df, Row(1, "foo", 1, "foo")) } } + + test("SPARK-45584: subquery execution should not fail with ORDER BY and LIMIT") { + withTable("t1") { + sql( + """ + |CREATE TABLE t1 USING PARQUET + |AS SELECT * FROM VALUES + |(1, "a"), + |(2, "a"), + |(3, "a") t(id, value) + |""".stripMargin) + val df = sql( + """ + |WITH t2 AS ( + | SELECT * FROM t1 ORDER BY id + |) + |SELECT *, (SELECT COUNT(*) FROM t2) FROM t2 LIMIT 10 + |""".stripMargin) + // This should not fail with IllegalArgumentException. + checkAnswer( + df, + Row(1, "a", 3) :: Row(2, "a", 3) :: Row(3, "a", 3) :: Nil) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org