This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e5a46106bca [SPARK-48655][SQL] SPJ: Add tests for shuffle skipping 
for aggregate queries
7e5a46106bca is described below

commit 7e5a46106bcac80bc535f4042e3adcb9cbb2c491
Author: Szehon Ho <[email protected]>
AuthorDate: Fri Jun 21 10:56:47 2024 -0700

    [SPARK-48655][SQL] SPJ: Add tests for shuffle skipping for aggregate queries
    
    ### What changes were proposed in this pull request?
      Add unit test in KeyGroupedPartitionSuite to verify that aggregation can 
also skip shuffle if key matches partition key
    
    ### Why are the changes needed?
      This lacked test coverage
    
    ### Does this PR introduce _any_ user-facing change?
      No
    
    ### How was this patch tested?
      Ran unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
      No
    
    Closes #47015 from szehon-ho/spj_test.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Chao Sun <[email protected]>
---
 .../connector/KeyGroupedPartitioningSuite.scala    | 23 ++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index a5de5bc1913b..d77a6e8b8ac1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -298,6 +298,12 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
         Row("bbb", 20, 250.0), Row("bbb", 20, 350.0), Row("ccc", 30, 400.50)))
   }
 
+  private def collectAllShuffles(plan: SparkPlan): Seq[ShuffleExchangeExec] = {
+    collect(plan) {
+      case s: ShuffleExchangeExec => s
+    }
+  }
+
   private def collectShuffles(plan: SparkPlan): Seq[ShuffleExchangeExec] = {
     // here we skip collecting shuffle operators that are not associated with 
SMJ
     collect(plan) {
@@ -346,6 +352,23 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
     Column.create("price", FloatType),
     Column.create("time", TimestampType))
 
+  test("SPARK-48655: group by on partition keys should not introduce 
additional shuffle") {
+    val items_partitions = Array(identity("id"))
+    createTable(items, itemsColumns, items_partitions)
+    sql(s"INSERT INTO testcat.ns.$items VALUES " +
+        s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
+        s"(1, 'aa', 41.0, cast('2020-01-02' as timestamp)), " +
+        s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
+        s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
+
+    val df = sql(s"SELECT MAX(price) AS res FROM testcat.ns.$items GROUP BY 
id")
+    val shuffles = collectAllShuffles(df.queryExecution.executedPlan)
+    assert(shuffles.isEmpty,
+      "should contain shuffle when not grouping by partition values")
+
+    checkAnswer(df.sort("res"), Seq(Row(10.0), Row(15.5), Row(41.0)))
+  }
+
   test("partitioned join: join with two partition keys and matching & sorted 
partitions") {
     val items_partitions = Array(bucket(8, "id"), days("arrive_time"))
     createTable(items, itemsColumns, items_partitions)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to