This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 37d6b3c [SPARK-32761][SQL][3.0] Allow aggregating multiple foldable
distinct expressions
37d6b3c is described below
commit 37d6b3c0fafa98922ed1ecf4f8634d962f5bb9d9
Author: Linhong Liu <[email protected]>
AuthorDate: Fri Oct 16 03:36:21 2020 +0000
[SPARK-32761][SQL][3.0] Allow aggregating multiple foldable distinct
expressions
### What changes were proposed in this pull request?
For queries with multiple foldable distinct columns, since they will be
eliminated during
execution, it's not mandatory to let `RewriteDistinctAggregates` handle
this case. And
in the current code, `RewriteDistinctAggregates` *dose* miss some
"aggregating with
multiple foldable distinct expressions" cases.
For example: `select count(distinct 2), count(distinct 2, 3)` will be
missed.
But in the planner, this will trigger an error that "multiple distinct
expressions" are not allowed.
As the foldable distinct columns can be eliminated finally, we can allow
this in the aggregation
planner check.
### Why are the changes needed?
bug fix
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
added test case
Authored-by: Linhong Liu <linhong.liudatabricks.com>
Signed-off-by: Wenchen Fan <wenchendatabricks.com>
(cherry picked from commit a410658c9bc244e325702dc926075bd835b669ff)
Closes #30052 from linhongliu-db/SPARK-32761-3.0.
Authored-by: Linhong Liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 6 ++++--
sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 4 ++++
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f836deb..689d1eb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -517,7 +517,8 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
val (functionsWithDistinct, functionsWithoutDistinct) =
aggregateExpressions.partition(_.isDistinct)
- if
(functionsWithDistinct.map(_.aggregateFunction.children.toSet).distinct.length
> 1) {
+ if (functionsWithDistinct.map(
+
_.aggregateFunction.children.filterNot(_.foldable).toSet).distinct.length > 1) {
// This is a sanity check. We should not reach here when we have
multiple distinct
// column sets. Our `RewriteDistinctAggregates` should take care
this case.
sys.error("You hit a query analyzer bug. Please report your query to
" +
@@ -548,7 +549,8 @@ abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
// to be [COUNT(DISTINCT foo), MAX(DISTINCT foo)], but
// [COUNT(DISTINCT bar), COUNT(DISTINCT foo)] is disallowed
because those two distinct
// aggregates have different column expressions.
- val distinctExpressions =
functionsWithDistinct.head.aggregateFunction.children
+ val distinctExpressions =
+
functionsWithDistinct.head.aggregateFunction.children.filterNot(_.foldable)
val normalizedNamedDistinctExpressions = distinctExpressions.map {
e =>
// Ideally this should be done in `NormalizeFloatingNumbers`,
but we do it here
// because `distinctExpressions` is not extracted during logical
phase.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 7869005..85cbe45 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2467,6 +2467,10 @@ class DataFrameSuite extends QueryTest
val df = l.join(r, $"col2" === $"col4", "LeftOuter")
checkAnswer(df, Row("2", "2"))
}
+
+ test("SPARK-32761: aggregating multiple distinct CONSTANT columns") {
+ checkAnswer(sql("select count(distinct 2), count(distinct 2,3)"), Row(1,
1))
+ }
}
case class GroupByKey(a: Int, b: Int)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]