This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 17b71239fc1 [SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge 17b71239fc1 is described below commit 17b71239fc10d620ac41cbfc1a2a984adc08da82 Author: Peter Toth <peter.t...@gmail.com> AuthorDate: Mon Feb 6 20:36:57 2023 +0800 [SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge ### What changes were proposed in this pull request? Unfortunately https://github.com/apache/spark/pull/32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that. ### Why are the changes needed? The following query: ``` SELECT (SELECT count(distinct c1) FROM t1), (SELECT count(distinct c2) FROM t1) ``` currently fails with: ``` java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list. at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538) ``` but works again after this PR. ### Does this PR introduce _any_ user-facing change? Yes, the above query works again. ### How was this patch tested? Added new UT. Closes #39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge. Authored-by: Peter Toth <peter.t...@gmail.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> (cherry picked from commit 5940b9884b4b172f65220da7857d2952b137bc51) Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../spark/sql/execution/SparkOptimizer.scala | 3 ++- .../scala/org/apache/spark/sql/SubquerySuite.scala | 25 ++++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index b8861715726..416865976df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -55,7 +55,8 @@ class SparkOptimizer( InjectRuntimeFilter, RewritePredicateSubquery) :+ Batch("MergeScalarSubqueries", Once, - MergeScalarSubqueries) :+ + MergeScalarSubqueries, + RewriteDistinctAggregates) :+ Batch("Pushdown Filters from PartitionPruning", fixedPoint, PushDownPredicates) :+ Batch("Cleanup filters that cannot be pushed down", Once, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 0975772fb90..cedc68cfc84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2269,4 +2269,29 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark assert(findProject(df2).size == 3) } } + + test("SPARK-42346: Rewrite distinct aggregates after merging subqueries") { + withTempView("t1") { + Seq((1, 2), (3, 4)).toDF("c1", "c2").createOrReplaceTempView("t1") + + checkAnswer(sql( + """ + |SELECT + | (SELECT count(distinct c1) FROM t1), + | (SELECT count(distinct c2) FROM t1) + |""".stripMargin), + Row(2, 2)) + + // In this case we don't merge the subqueries as `RewriteDistinctAggregates` kicks off for the + // 2 subqueries first but `MergeScalarSubqueries` is not prepared for the `Expand` nodes that + // are inserted by the rewrite. + checkAnswer(sql( + """ + |SELECT + | (SELECT count(distinct c1) + sum(distinct c2) FROM t1), + | (SELECT count(distinct c2) + sum(distinct c1) FROM t1) + |""".stripMargin), + Row(8, 6)) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org