This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new cb36e748ed [KYUUBI #6989] Calculate expected join partitions based on
scanned table size
cb36e748ed is described below
commit cb36e748ed8c74f0ab7a3f2b2cea02d24242f4b0
Author: wforget <[email protected]>
AuthorDate: Tue Mar 18 20:23:35 2025 +0800
[KYUUBI #6989] Calculate expected join partitions based on scanned table
size
### Why are the changes needed?
Avoid unstable test case caused by table size changes, this is likely to
happen when upgrading Parquet/ORC/Spark.
### How was this patch tested?
unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #6989 from wForget/minor_fix.
Closes #6989
9cdd36973 [wforget] address comments
f79fcca0d [wforget] Calculate expected join partitions based on scanned
table size
Authored-by: wforget <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/sql/DynamicShufflePartitionsSuite.scala | 61 ++++++++++++++--------
1 file changed, 38 insertions(+), 23 deletions(-)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/DynamicShufflePartitionsSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/DynamicShufflePartitionsSuite.scala
index 6668675a5f..6d252671d3 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/DynamicShufflePartitionsSuite.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/DynamicShufflePartitionsSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{CommandResultExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
ShuffleExchangeExec}
@@ -51,6 +52,15 @@ class DynamicShufflePartitionsSuite extends
KyuubiSparkSQLExtensionTest {
sql("ANALYZE TABLE table2 COMPUTE STATISTICS")
val initialPartitionNum: Int = 2
+ val advisoryPartitionSizeInBytes: Long = 500
+
+ val t1Size =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("table1"))
+ .stats.get.sizeInBytes.toLong
+ val t2Size =
spark.sessionState.catalog.getTableMetadata(TableIdentifier("table2"))
+ .stats.get.sizeInBytes.toLong
+ val scanSize = t1Size + t2Size
+ val expectedJoinPartitionNum = Math.ceil(scanSize.toDouble /
advisoryPartitionSizeInBytes)
+
Seq(false, true).foreach { dynamicShufflePartitions =>
val maxDynamicShufflePartitions = if (dynamicShufflePartitions) {
Seq(8, 2000)
@@ -63,34 +73,37 @@ class DynamicShufflePartitionsSuite extends
KyuubiSparkSQLExtensionTest {
DYNAMIC_SHUFFLE_PARTITIONS_MAX_NUM.key ->
maxDynamicShufflePartitionNum.toString,
AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key ->
initialPartitionNum.toString,
- ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "500") {
+ ADVISORY_PARTITION_SIZE_IN_BYTES.key ->
advisoryPartitionSizeInBytes.toString) {
val df = sql("insert overwrite table3 " +
" select a.c1 as c1, b.c2 as c2 from table1 a join table2 b on
a.c1 = b.c1")
val exchanges = collectExchanges(df.queryExecution.executedPlan)
val (joinExchanges, rebalanceExchanges) = exchanges
.partition(_.shuffleOrigin == ENSURE_REQUIREMENTS)
- // table scan size: 7369 3287
assert(joinExchanges.size == 2)
if (dynamicShufflePartitions) {
- joinExchanges.foreach(e =>
- assert(e.outputPartitioning.numPartitions
- == Math.min(22, maxDynamicShufflePartitionNum)))
+ joinExchanges.foreach { e =>
+ val expected = Math.min(expectedJoinPartitionNum,
maxDynamicShufflePartitionNum)
+ assert(e.outputPartitioning.numPartitions == expected)
+ }
} else {
- joinExchanges.foreach(e =>
- assert(e.outputPartitioning.numPartitions ==
initialPartitionNum))
+ joinExchanges.foreach { e =>
+ assert(e.outputPartitioning.numPartitions ==
initialPartitionNum)
+ }
}
assert(rebalanceExchanges.size == 1)
if (dynamicShufflePartitions) {
if (maxDynamicShufflePartitionNum == 8) {
- // shuffle query size: 1424 451
-
assert(rebalanceExchanges.head.outputPartitioning.numPartitions ==
- Math.min(4, maxDynamicShufflePartitionNum))
+ // shuffle query size: 1424 451 (the size may change with
spark version updates
+ // or shuffle configuration updates)
+ val expected = Math.min(4, maxDynamicShufflePartitionNum)
+
assert(rebalanceExchanges.head.outputPartitioning.numPartitions == expected)
} else {
- // shuffle query size: 2057 664
-
assert(rebalanceExchanges.head.outputPartitioning.numPartitions ==
- Math.min(6, maxDynamicShufflePartitionNum))
+ // shuffle query size: 2057 664 (the size may change with
spark version updates
+ // or shuffle configuration updates)
+ val expected = Math.min(6, maxDynamicShufflePartitionNum)
+
assert(rebalanceExchanges.head.outputPartitioning.numPartitions == expected)
}
} else {
assert(
@@ -104,7 +117,7 @@ class DynamicShufflePartitionsSuite extends
KyuubiSparkSQLExtensionTest {
DYNAMIC_SHUFFLE_PARTITIONS_MAX_NUM.key ->
maxDynamicShufflePartitionNum.toString,
AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key ->
initialPartitionNum.toString,
- ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "500",
+ ADVISORY_PARTITION_SIZE_IN_BYTES.key ->
advisoryPartitionSizeInBytes.toString,
CONVERT_METASTORE_PARQUET.key -> "false") {
val df = sql("insert overwrite table3 " +
" select a.c1 as c1, b.c2 as c2 from table1 a join table2 b on
a.c1 = b.c1")
@@ -112,21 +125,23 @@ class DynamicShufflePartitionsSuite extends
KyuubiSparkSQLExtensionTest {
val exchanges = collectExchanges(df.queryExecution.executedPlan)
val (joinExchanges, rebalanceExchanges) = exchanges
.partition(_.shuffleOrigin == ENSURE_REQUIREMENTS)
- // table scan size: 7369 3287
assert(joinExchanges.size == 2)
if (dynamicShufflePartitions) {
- joinExchanges.foreach(e =>
- assert(e.outputPartitioning.numPartitions ==
- Math.min(22, maxDynamicShufflePartitionNum)))
+ joinExchanges.foreach { e =>
+ val expected = Math.min(expectedJoinPartitionNum,
maxDynamicShufflePartitionNum)
+ assert(e.outputPartitioning.numPartitions == expected)
+ }
} else {
- joinExchanges.foreach(e =>
- assert(e.outputPartitioning.numPartitions ==
initialPartitionNum))
+ joinExchanges.foreach { e =>
+ assert(e.outputPartitioning.numPartitions ==
initialPartitionNum)
+ }
}
- // shuffle query size: 5154 720
+ // shuffle query size: 5154 720 (the size may change with spark
version updates
+ // or shuffle configuration updates)
assert(rebalanceExchanges.size == 1)
if (dynamicShufflePartitions) {
- assert(rebalanceExchanges.head.outputPartitioning.numPartitions
- == Math.min(12, maxDynamicShufflePartitionNum))
+ val expected = Math.min(12, maxDynamicShufflePartitionNum)
+ assert(rebalanceExchanges.head.outputPartitioning.numPartitions
== expected)
} else {
assert(rebalanceExchanges.head.outputPartitioning.numPartitions
==
initialPartitionNum)