This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 73a2b84ad924 [SPARK-47094][SQL][TEST][FOLLOWUP] SPJ : fix bucket 
reducer function
73a2b84ad924 is described below

commit 73a2b84ad924cfa26fec17f066a87ad19d679ef4
Author: himadripal <[email protected]>
AuthorDate: Wed Oct 30 09:58:26 2024 -0700

    [SPARK-47094][SQL][TEST][FOLLOWUP] SPJ : fix bucket reducer function
    
    ### What changes were proposed in this pull request?
    SPJ compatible bucket issue has an implementation of reducible function. 
This patch fixes the implementation and make it same as in apache iceberg one.
    
    ### Why are the changes needed?
    With this fix, incompatible number of buckets do not return 1 as GCD, hence 
the buckets do not reduce to 1 when it used in incompatible number of buckets.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    With unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #47126 from himadripal/fix_spj_transform_expression.
    
    Authored-by: himadripal <[email protected]>
    Signed-off-by: huaxingao <[email protected]>
---
 .../connector/KeyGroupedPartitioningSuite.scala    | 63 +++++++++++++++++++---
 .../catalog/functions/transformFunctions.scala     |  6 +--
 2 files changed, 59 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 14598f243785..152896499010 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -1390,7 +1390,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
     }
   }
 
-  test("SPARK-44647: test join key is subset of cluster key " +
+  test("SPARK-44647: SPJ: test join key is subset of cluster key " +
       "with push values and partially-clustered") {
     val table1 = "tab1e1"
     val table2 = "table2"
@@ -1487,7 +1487,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
     }
   }
 
-  test("SPARK-47094: Support compatible buckets") {
+  test("SPARK-47094: SPJ: Support compatible buckets") {
     val table1 = "tab1e1"
     val table2 = "table2"
 
@@ -1580,11 +1580,11 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
             val shuffles = collectShuffles(df.queryExecution.executedPlan)
             assert(shuffles.isEmpty, "SPJ should be triggered")
 
-            val scans = 
collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+            val partions = 
collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
               partitions.length)
             val expectedBuckets = Math.min(table1buckets1, table2buckets1) *
               Math.min(table1buckets2, table2buckets2)
-            assert(scans == Seq(expectedBuckets, expectedBuckets))
+            assert(partions == Seq(expectedBuckets, expectedBuckets))
 
             checkAnswer(df, Seq(
               Row(0, 0, "aa", "aa"),
@@ -1647,7 +1647,7 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
     }
   }
 
-  test("SPARK-47094: Support compatible buckets with common divisor") {
+  test("SPARK-47094: SPJ:Support compatible buckets with common divisor") {
     val table1 = "tab1e1"
     val table2 = "table2"
 
@@ -1744,9 +1744,9 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
               partitions.length)
 
             def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt
-            val expectedBuckets = gcd(table1buckets1, table2buckets1) *
+            val expectedPartitions = gcd(table1buckets1, table2buckets1) *
               gcd(table1buckets2, table2buckets2)
-            assert(scans == Seq(expectedBuckets, expectedBuckets))
+            assert(scans == Seq(expectedPartitions, expectedPartitions))
 
             checkAnswer(df, Seq(
               Row(0, 0, "aa", "aa"),
@@ -1809,6 +1809,55 @@ class KeyGroupedPartitioningSuite extends 
DistributionAndOrderingSuiteBase {
     }
   }
 
+  test("SPARK-47094: SPJ: Does not trigger when incompatible number of buckets 
on both side") {
+    val table1 = "tab1e1"
+    val table2 = "table2"
+
+    Seq(
+      (2, 3),
+      (3, 4)
+    ).foreach {
+      case (table1buckets1, table2buckets1) =>
+        catalog.clearTables()
+
+        val partition1 = Array(bucket(table1buckets1, "store_id"))
+        val partition2 = Array(bucket(table2buckets1, "store_id"))
+
+        Seq((table1, partition1), (table2, partition2)).foreach { case (tab, 
part) =>
+          createTable(tab, columns2, part)
+          val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " +
+            "(0, 0, 'aa'), " +
+            "(1, 0, 'ab'), " + // duplicate partition key
+            "(2, 2, 'ac'), " +
+            "(3, 3, 'ad'), " +
+            "(4, 2, 'bc') "
+
+          sql(insertStr)
+        }
+
+        Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
+          withSQLConf(
+            SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+            SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+            SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key 
-> "false",
+            SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key 
->
+              allowJoinKeysSubsetOfPartitionKeys.toString,
+            SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+            val df = sql(
+              s"""
+                 |${selectWithMergeJoinHint("t1", "t2")}
+                 |t1.store_id, t1.dept_id, t1.data, t2.data
+                 |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+                 |ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id
+                 |""".stripMargin)
+
+            val shuffles = collectShuffles(df.queryExecution.executedPlan)
+            assert(shuffles.nonEmpty, "SPJ should not be triggered")
+          }
+        }
+    }
+  }
+
   test("SPARK-47094: Support compatible buckets with less join keys than 
partition keys") {
     val table1 = "tab1e1"
     val table2 = "table2"
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
index 5364fc5d6242..b82cc2392e1f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
@@ -101,8 +101,8 @@ object BucketFunction extends ScalarFunction[Int] with 
ReducibleFunction[Int, In
 
     if (otherFunc == BucketFunction) {
       val gcd = this.gcd(thisNumBuckets, otherNumBuckets)
-      if (gcd != thisNumBuckets) {
-        return BucketReducer(thisNumBuckets, gcd)
+      if (gcd > 1 && gcd != thisNumBuckets) {
+        return BucketReducer(gcd)
       }
     }
     null
@@ -111,7 +111,7 @@ object BucketFunction extends ScalarFunction[Int] with 
ReducibleFunction[Int, In
   private def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt
 }
 
-case class BucketReducer(thisNumBuckets: Int, divisor: Int) extends 
Reducer[Int, Int] {
+case class BucketReducer(divisor: Int) extends Reducer[Int, Int] {
   override def reduce(bucket: Int): Int = bucket % divisor
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to