This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new c59e54f [SPARK-35639][SQL] Add metrics about coalesced partitions to
AQEShuffleRead in AQE
c59e54f is described below
commit c59e54fe0e35315504132340135676955c7e0d16
Author: Eugene Koifman <[email protected]>
AuthorDate: Wed Jul 28 13:49:48 2021 +0800
[SPARK-35639][SQL] Add metrics about coalesced partitions to AQEShuffleRead
in AQE
### What changes were proposed in this pull request?
AQEShuffleReadExec already reports "number of skewed partitions" and
"number of skewed partition splits".
It would be useful to also report "number of coalesced partitions" and for
ShuffleExchange to report "number of partitions"
This way it's clear what happened on the map side and on the reduce side.

### Why are the changes needed?
Improves usability
### Does this PR introduce _any_ user-facing change?
Yes, it now provides more information about `AQEShuffleReadExec` operator
behavior in the metrics system.
### How was this patch tested?
Existing tests
Closes #32776 from ekoifman/PRISM-91635-customshufflereader-sql-metrics.
Authored-by: Eugene Koifman <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 41a16ebf1196bec86aec104e72fd7fb1597c0073)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/adaptive/AQEShuffleReadExec.scala | 30 +++++++++++++++++-----
.../execution/exchange/ShuffleExchangeExec.scala | 10 ++++++--
.../scala/org/apache/spark/sql/ExplainSuite.scala | 1 +
.../adaptive/AdaptiveQueryExecSuite.scala | 3 ++-
4 files changed, 35 insertions(+), 9 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
index d897507..0768b9b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
@@ -90,15 +90,19 @@ case class AQEShuffleReadExec private(
}
/**
+ * Returns true iff some partitions were actually combined
+ */
+ private def isCoalesced(spec: ShufflePartitionSpec) = spec match {
+ case CoalescedPartitionSpec(0, 0, _) => true
+ case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex
> 1
+ case _ => false
+ }
+
+ /**
* Returns true iff some non-empty partitions were combined
*/
def hasCoalescedPartition: Boolean = {
- partitionSpecs.exists {
- // shuffle from empty RDD
- case CoalescedPartitionSpec(0, 0, _) => true
- case s: CoalescedPartitionSpec => s.endReducerIndex -
s.startReducerIndex > 1
- case _ => false
- }
+ partitionSpecs.exists(isCoalesced)
}
def hasSkewedPartition: Boolean =
@@ -153,6 +157,13 @@ case class AQEShuffleReadExec private(
driverAccumUpdates += (skewedSplits.id -> numSplits)
}
+ if (hasCoalescedPartition) {
+ val numCoalescedPartitionsMetric = metrics("numCoalescedPartitions")
+ val x = partitionSpecs.count(isCoalesced)
+ numCoalescedPartitionsMetric.set(x)
+ driverAccumUpdates += numCoalescedPartitionsMetric.id -> x
+ }
+
partitionDataSizes.foreach { dataSizes =>
val partitionDataSizeMetrics = metrics("partitionDataSize")
driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _)
@@ -183,6 +194,13 @@ case class AQEShuffleReadExec private(
} else {
Map.empty
}
+ } ++ {
+ if (hasCoalescedPartition) {
+ Map("numCoalescedPartitions" ->
+ SQLMetrics.createMetric(sparkContext, "number of coalesced
partitions"))
+ } else {
+ Map.empty
+ }
}
} else {
// It's a canonicalized plan, no need to report metrics.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 5a45af6..c033aed 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -123,7 +123,8 @@ case class ShuffleExchangeExec(
private[sql] lazy val readMetrics =
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = Map(
- "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
+ "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
+ "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of
partitions")
) ++ readMetrics ++ writeMetrics
override def nodeName: String = "Exchange"
@@ -164,12 +165,17 @@ case class ShuffleExchangeExec(
*/
@transient
lazy val shuffleDependency : ShuffleDependency[Int, InternalRow,
InternalRow] = {
- ShuffleExchangeExec.prepareShuffleDependency(
+ val dep = ShuffleExchangeExec.prepareShuffleDependency(
inputRDD,
child.output,
outputPartitioning,
serializer,
writeMetrics)
+ metrics("numPartitions").set(dep.partitioner.numPartitions)
+ val executionId =
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+ SQLMetrics.postDriverMetricUpdates(
+ sparkContext, executionId, metrics("numPartitions") :: Nil)
+ dep
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index fbbdd42..2086f97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -572,6 +572,7 @@ class ExplainSuiteAE extends ExplainSuiteHelper with
EnableAdaptiveExecutionSuit
"""
|(11) AQEShuffleRead
|Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
+ |Arguments: coalesced
|""".stripMargin,
"""
|(16) BroadcastHashJoin
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 46ca786..dda94f1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -927,7 +927,8 @@ class AdaptiveQueryExecSuite
assert(!read.hasSkewedPartition)
assert(read.hasCoalescedPartition)
assert(read.metrics.keys.toSeq.sorted == Seq(
- "numPartitions", "partitionDataSize"))
+ "numCoalescedPartitions", "numPartitions", "partitionDataSize"))
+ assert(read.metrics("numCoalescedPartitions").value == 1)
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
assert(read.metrics("partitionDataSize").value > 0)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]