This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ae75efeb407f [SPARK-51505][SQL] Always show empty partition number
metrics in AQEShuffleReadExec
ae75efeb407f is described below
commit ae75efeb407fd3a5aa924d85c2471dcd32028063
Author: ziqi liu <[email protected]>
AuthorDate: Thu Jul 24 12:27:30 2025 +0800
[SPARK-51505][SQL] Always show empty partition number metrics in
AQEShuffleReadExec
### What changes were proposed in this pull request?
A followup for https://github.com/apache/spark/pull/50273 Always show empty
partition number metrics in AQEShuffleReadExec
### Why are the changes needed?
Even when there is no coaclescing, we still want to know empty partitions:
imaging we have shuffle skewness and each non-empty partition is large so no
coalescing happen, but there are many empty partitions.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Update UT
### Was this patch authored or co-authored using generative AI tooling?
NO
Closes #51608 from liuzqt/SPARK-51505.
Authored-by: ziqi liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/adaptive/AQEShuffleReadExec.scala | 27 +++++++++++-----------
.../execution/CoalesceShufflePartitionsSuite.scala | 18 +++++++--------
.../adaptive/AdaptiveQueryExecSuite.scala | 2 +-
3 files changed, 23 insertions(+), 24 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
index e8b70f94a769..2a600b31cc29 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
@@ -178,6 +178,15 @@ case class AQEShuffleReadExec private(
numPartitionsMetric.set(partitionSpecs.length)
driverAccumUpdates += (numPartitionsMetric.id ->
partitionSpecs.length.toLong)
+ val numEmptyPartitionsMetric = metrics("numEmptyPartitions")
+ val numEmptyPartitions = child match {
+ case s: ShuffleQueryStageExec =>
+ s.mapStats.map(stats => stats.bytesByPartitionId.count(_ ==
0)).getOrElse(0)
+ case _ => 0
+ }
+ numEmptyPartitionsMetric.set(numEmptyPartitions)
+ driverAccumUpdates += (numEmptyPartitionsMetric.id ->
numEmptyPartitions.toLong)
+
if (hasSkewedPartition) {
val skewedSpecs = partitionSpecs.collect {
case p: PartialReducerPartitionSpec => p
@@ -200,15 +209,7 @@ case class AQEShuffleReadExec private(
val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions")
val x = partitionSpecs.count(isCoalescedSpec)
numCoalescedPartitionsMetric.set(x)
- val numEmptyPartitionsMetric = metrics("numEmptyPartitions")
- val y = child match {
- case s: ShuffleQueryStageExec =>
- s.mapStats.map(stats => stats.bytesByPartitionId.count(_ ==
0)).getOrElse(0)
- case _ => 0
- }
- numEmptyPartitionsMetric.set(y)
- driverAccumUpdates ++= Seq(numCoalescedPartitionsMetric.id -> x,
- numEmptyPartitionsMetric.id -> y)
+ driverAccumUpdates ++= Seq(numCoalescedPartitionsMetric.id -> x)
}
partitionDataSizes.foreach { dataSizes =>
@@ -223,7 +224,9 @@ case class AQEShuffleReadExec private(
@transient override lazy val metrics: Map[String, SQLMetric] = {
if (shuffleStage.isDefined) {
- Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of
partitions")) ++ {
+ Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of
partitions"),
+ "numEmptyPartitions" ->
+ SQLMetrics.createMetric(sparkContext, "number of empty partitions"))
++ {
if (isLocalRead) {
// We split the mapper partition evenly when creating local shuffle
read, so no
// data size info is available.
@@ -244,9 +247,7 @@ case class AQEShuffleReadExec private(
} ++ {
if (hasCoalescedPartition) {
Map("numCoalescedPartitions" ->
- SQLMetrics.createMetric(sparkContext, "number of coalesced
partitions"),
- "numEmptyPartitions" ->
- SQLMetrics.createMetric(sparkContext, "number of empty
partitions"))
+ SQLMetrics.createMetric(sparkContext, "number of coalesced
partitions"))
} else {
Map.empty
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index 4b650957e42c..28762f01d7a2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -503,16 +503,14 @@ class CoalesceShufflePartitionsSuite extends
SparkFunSuite with SQLConfHelper
test("SPARK-51505: log empty partition number metrics") {
val test: SparkSession => Unit = { spark: SparkSession =>
- withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
- val df = spark.range(0, 1000, 1, 5).withColumn("value", when(col("id")
< 500, 0)
- .otherwise(1)).groupBy("value").agg("value" -> "sum")
- df.collect()
- val plan = df.queryExecution.executedPlan
- val coalesce = collectFirst(plan) {
- case e: AQEShuffleReadExec => e
- }.get
- assert(coalesce.metrics("numEmptyPartitions").value == 3)
- }
+ val df = spark.range(0, 1000, 1, 10).withColumn("value", expr("id % 3"))
+ .groupBy("value").agg("value" -> "sum")
+ df.collect()
+ val plan = df.queryExecution.executedPlan
+ val coalesce = collectFirst(plan) {
+ case e: AQEShuffleReadExec => e
+ }.get
+ assert(coalesce.metrics("numEmptyPartitions").value == 2)
}
withSparkSession(test, 100, None)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 5d81239d023e..05c1012200df 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1124,7 +1124,7 @@ class AdaptiveQueryExecSuite
assert(reads.length == 1)
val read = reads.head
assert(read.isLocalRead)
- assert(read.metrics.keys.toSeq == Seq("numPartitions"))
+ assert(read.metrics.keys.toSeq == Seq("numPartitions",
"numEmptyPartitions"))
assert(read.metrics("numPartitions").value ==
read.partitionSpecs.length)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]