This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7e5a46106bca [SPARK-48655][SQL] SPJ: Add tests for shuffle skipping
for aggregate queries
7e5a46106bca is described below
commit 7e5a46106bcac80bc535f4042e3adcb9cbb2c491
Author: Szehon Ho <[email protected]>
AuthorDate: Fri Jun 21 10:56:47 2024 -0700
[SPARK-48655][SQL] SPJ: Add tests for shuffle skipping for aggregate queries
### What changes were proposed in this pull request?
Add unit test in KeyGroupedPartitionSuite to verify that aggregation can
also skip shuffle if key matches partition key
### Why are the changes needed?
This lacked test coverage
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Ran unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47015 from szehon-ho/spj_test.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
.../connector/KeyGroupedPartitioningSuite.scala | 23 ++++++++++++++++++++++
1 file changed, 23 insertions(+)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index a5de5bc1913b..d77a6e8b8ac1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -298,6 +298,12 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
Row("bbb", 20, 250.0), Row("bbb", 20, 350.0), Row("ccc", 30, 400.50)))
}
+ private def collectAllShuffles(plan: SparkPlan): Seq[ShuffleExchangeExec] = {
+ collect(plan) {
+ case s: ShuffleExchangeExec => s
+ }
+ }
+
private def collectShuffles(plan: SparkPlan): Seq[ShuffleExchangeExec] = {
// here we skip collecting shuffle operators that are not associated with
SMJ
collect(plan) {
@@ -346,6 +352,23 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
Column.create("price", FloatType),
Column.create("time", TimestampType))
+ test("SPARK-48655: group by on partition keys should not introduce
additional shuffle") {
+ val items_partitions = Array(identity("id"))
+ createTable(items, itemsColumns, items_partitions)
+ sql(s"INSERT INTO testcat.ns.$items VALUES " +
+ s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+ s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " +
+ s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+ s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+ val df = sql(s"SELECT MAX(price) AS res FROM testcat.ns.$items GROUP BY
id")
+ val shuffles = collectAllShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.isEmpty,
+ "should contain shuffle when not grouping by partition values")
+
+ checkAnswer(df.sort("res"), Seq(Row(10.0), Row(15.5), Row(41.0)))
+ }
+
test("partitioned join: join with two partition keys and matching & sorted
partitions") {
val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
createTable(items, itemsColumns, items_partitions)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]