This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 73a2b84ad924 [SPARK-47094][SQL][TEST][FOLLOWUP] SPJ : fix bucket
reducer function
73a2b84ad924 is described below
commit 73a2b84ad924cfa26fec17f066a87ad19d679ef4
Author: himadripal <[email protected]>
AuthorDate: Wed Oct 30 09:58:26 2024 -0700
[SPARK-47094][SQL][TEST][FOLLOWUP] SPJ : fix bucket reducer function
### What changes were proposed in this pull request?
SPJ compatible bucket issue has an implementation of reducible function.
This patch fixes the implementation and make it same as in apache iceberg one.
### Why are the changes needed?
With this fix, incompatible number of buckets do not return 1 as GCD, hence
the buckets do not reduce to 1 when it used in incompatible number of buckets.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
With unit tests
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47126 from himadripal/fix_spj_transform_expression.
Authored-by: himadripal <[email protected]>
Signed-off-by: huaxingao <[email protected]>
---
.../connector/KeyGroupedPartitioningSuite.scala | 63 +++++++++++++++++++---
.../catalog/functions/transformFunctions.scala | 6 +--
2 files changed, 59 insertions(+), 10 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 14598f243785..152896499010 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -1390,7 +1390,7 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
}
}
- test("SPARK-44647: test join key is subset of cluster key " +
+ test("SPARK-44647: SPJ: test join key is subset of cluster key " +
"with push values and partially-clustered") {
val table1 = "tab1e1"
val table2 = "table2"
@@ -1487,7 +1487,7 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
}
}
- test("SPARK-47094: Support compatible buckets") {
+ test("SPARK-47094: SPJ: Support compatible buckets") {
val table1 = "tab1e1"
val table2 = "table2"
@@ -1580,11 +1580,11 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
val shuffles = collectShuffles(df.queryExecution.executedPlan)
assert(shuffles.isEmpty, "SPJ should be triggered")
- val scans =
collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+ val partions =
collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
partitions.length)
val expectedBuckets = Math.min(table1buckets1, table2buckets1) *
Math.min(table1buckets2, table2buckets2)
- assert(scans == Seq(expectedBuckets, expectedBuckets))
+ assert(partions == Seq(expectedBuckets, expectedBuckets))
checkAnswer(df, Seq(
Row(0, 0, "aa", "aa"),
@@ -1647,7 +1647,7 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
}
}
- test("SPARK-47094: Support compatible buckets with common divisor") {
+ test("SPARK-47094: SPJ:Support compatible buckets with common divisor") {
val table1 = "tab1e1"
val table2 = "table2"
@@ -1744,9 +1744,9 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
partitions.length)
def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt
- val expectedBuckets = gcd(table1buckets1, table2buckets1) *
+ val expectedPartitions = gcd(table1buckets1, table2buckets1) *
gcd(table1buckets2, table2buckets2)
- assert(scans == Seq(expectedBuckets, expectedBuckets))
+ assert(scans == Seq(expectedPartitions, expectedPartitions))
checkAnswer(df, Seq(
Row(0, 0, "aa", "aa"),
@@ -1809,6 +1809,55 @@ class KeyGroupedPartitioningSuite extends
DistributionAndOrderingSuiteBase {
}
}
+ test("SPARK-47094: SPJ: Does not trigger when incompatible number of buckets
on both side") {
+ val table1 = "tab1e1"
+ val table2 = "table2"
+
+ Seq(
+ (2, 3),
+ (3, 4)
+ ).foreach {
+ case (table1buckets1, table2buckets1) =>
+ catalog.clearTables()
+
+ val partition1 = Array(bucket(table1buckets1, "store_id"))
+ val partition2 = Array(bucket(table2buckets1, "store_id"))
+
+ Seq((table1, partition1), (table2, partition2)).foreach { case (tab,
part) =>
+ createTable(tab, columns2, part)
+ val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " +
+ "(0, 0, 'aa'), " +
+ "(1, 0, 'ab'), " + // duplicate partition key
+ "(2, 2, 'ac'), " +
+ "(3, 3, 'ad'), " +
+ "(4, 2, 'bc') "
+
+ sql(insertStr)
+ }
+
+ Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
+ withSQLConf(
+ SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+ SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+ SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key
-> "false",
+ SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key
->
+ allowJoinKeysSubsetOfPartitionKeys.toString,
+ SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+ val df = sql(
+ s"""
+ |${selectWithMergeJoinHint("t1", "t2")}
+ |t1.store_id, t1.dept_id, t1.data, t2.data
+ |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+ |ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id
+ |""".stripMargin)
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.nonEmpty, "SPJ should not be triggered")
+ }
+ }
+ }
+ }
+
test("SPARK-47094: Support compatible buckets with less join keys than
partition keys") {
val table1 = "tab1e1"
val table2 = "table2"
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
index 5364fc5d6242..b82cc2392e1f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
@@ -101,8 +101,8 @@ object BucketFunction extends ScalarFunction[Int] with
ReducibleFunction[Int, In
if (otherFunc == BucketFunction) {
val gcd = this.gcd(thisNumBuckets, otherNumBuckets)
- if (gcd != thisNumBuckets) {
- return BucketReducer(thisNumBuckets, gcd)
+ if (gcd > 1 && gcd != thisNumBuckets) {
+ return BucketReducer(gcd)
}
}
null
@@ -111,7 +111,7 @@ object BucketFunction extends ScalarFunction[Int] with
ReducibleFunction[Int, In
private def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt
}
-case class BucketReducer(thisNumBuckets: Int, divisor: Int) extends
Reducer[Int, Int] {
+case class BucketReducer(divisor: Int) extends Reducer[Int, Int] {
override def reduce(bucket: Int): Int = bucket % divisor
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]