This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 3b3eb6f [SPARK-36489][SQL] Aggregate functions over no grouping keys,
on tables with a single bucket, return multiple rows
3b3eb6f is described below
commit 3b3eb6f8eae8519d107e8ae7c634c61e9a2b5e5c
Author: IonutBoicuAms <[email protected]>
AuthorDate: Thu Aug 12 15:22:38 2021 +0800
[SPARK-36489][SQL] Aggregate functions over no grouping keys, on tables
with a single bucket, return multiple rows
### What changes were proposed in this pull request?
This PR fixes a bug in `DisableUnnecessaryBucketedScan`.
When running any aggregate function, without any grouping keys, on a table
with a single bucket, multiple rows are returned.
This happens because the aggregate function satisfies the `AllTuples`
distribution, no `Exchange` will be planned, and the bucketed scan will be
disabled.
### Why are the changes needed?
Bug fixing. Aggregates over no grouping keys should return a single row.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added new test in `DisableUnnecessaryBucketedScanSuite`.
Closes #33711 from IonutBoicuAms/fix-bug-disableunnecessarybucketedscan.
Authored-by: IonutBoicuAms <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 2b665751d9c7e4fb07ea18ce6611328e24f3dce9)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../bucketing/DisableUnnecessaryBucketedScan.scala | 4 ++--
.../DisableUnnecessaryBucketedScanSuite.scala | 26 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 2 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
index 98bcab2..5bd70c6 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.bucketing
-import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution,
HashClusteredDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples,
ClusteredDistribution, HashClusteredDistribution}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec,
ProjectExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
@@ -120,7 +120,7 @@ object DisableUnnecessaryBucketedScan extends
Rule[SparkPlan] {
private def hasInterestingPartition(plan: SparkPlan): Boolean = {
plan.requiredChildDistribution.exists {
- case _: ClusteredDistribution | _: HashClusteredDistribution => true
+ case _: ClusteredDistribution | _: HashClusteredDistribution | AllTuples
=> true
case _ => false
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
index 1a19824..737cffc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
@@ -258,4 +258,30 @@ abstract class DisableUnnecessaryBucketedScanSuite
}
}
}
+
+ test("Aggregates with no groupby over tables having 1 BUCKET, return
multiple rows") {
+ withTable("t1") {
+ withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
+ sql(
+ """
+ |CREATE TABLE t1 (`id` BIGINT, `event_date` DATE)
+ |USING PARQUET
+ |CLUSTERED BY (id)
+ |INTO 1 BUCKETS
+ |""".stripMargin)
+ sql(
+ """
+ |INSERT INTO TABLE t1 VALUES(1.23, cast("2021-07-07" as date))
+ |""".stripMargin)
+ sql(
+ """
+ |INSERT INTO TABLE t1 VALUES(2.28, cast("2021-08-08" as date))
+ |""".stripMargin)
+ val df = spark.sql("select sum(id) from t1 where id is not null")
+ assert(df.count == 1)
+ checkDisableBucketedScan(query = "SELECT SUM(id) FROM t1 WHERE id is
not null",
+ expectedNumScanWithAutoScanEnabled = 1,
expectedNumScanWithAutoScanDisabled = 1)
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]