This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a8482ad2a725 [SPARK-54272][SQL] Add aggTime for SortAggregateExec
a8482ad2a725 is described below
commit a8482ad2a7256e46e0c14a6a1e9477f8d498818f
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Nov 27 13:35:32 2025 +0800
[SPARK-54272][SQL] Add aggTime for SortAggregateExec
### What changes were proposed in this pull request?
Add `aggTime` metrics for `SortAggregateExec`
### Why are the changes needed?
Add more metrics
### Does this PR introduce _any_ user-facing change?
Yes the SQL metrics "time in aggregation build" itself on Spark UI.
### How was this patch tested?
UT
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52968 from AngersZhuuuu/SPARK-54272.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/aggregate/SortAggregateExec.scala | 7 +++++--
.../aggregate/SortBasedAggregationIterator.scala | 7 ++++++-
.../spark/sql/execution/metric/SQLMetricsSuite.scala | 20 ++++++++++++++++++++
3 files changed, 31 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index c4ff2454ae67..06f87af50eb5 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -45,7 +45,8 @@ case class SortAggregateExec(
with OrderPreservingUnaryExecNode {
override lazy val metrics = Map(
- "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"))
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
+ "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in
aggregation build"))
override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
groupingExpressions.map(SortOrder(_, Ascending)) :: Nil
@@ -57,6 +58,7 @@ case class SortAggregateExec(
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
+ val aggTime = longMetric("aggTime")
child.execute().mapPartitionsWithIndexInternal { (partIndex, iter) =>
// Because the constructor of an aggregation iterator will read at least
the first row,
// we need to get the value of iter.hasNext first.
@@ -77,7 +79,8 @@ case class SortAggregateExec(
resultExpressions,
(expressions, inputSchema) =>
MutableProjection.create(expressions, inputSchema),
- numOutputRows)
+ numOutputRows,
+ aggTime)
if (!hasInput && groupingExpressions.isEmpty) {
// There is no input and there is no grouping expressions.
// We need to output a single row as the output.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 143190a88821..a1c70066ee03 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.aggregate
+import scala.concurrent.duration.NANOSECONDS
+
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -38,7 +40,8 @@ class SortBasedAggregationIterator(
initialInputBufferOffset: Int,
resultExpressions: Seq[NamedExpression],
newMutableProjection: (Seq[Expression], Seq[Attribute]) =>
MutableProjection,
- numOutputRows: SQLMetric)
+ numOutputRows: SQLMetric,
+ aggTime: SQLMetric)
extends AggregationIterator(
partIndex,
groupingExpressions,
@@ -122,6 +125,7 @@ class SortBasedAggregationIterator(
// Now, we will start to find all rows belonging to this group.
// We create a variable to track if we see the next group.
var findNextPartition = false
+ val startNs = System.nanoTime()
// firstRowInNextGroup is the first row of this group. We first process it.
processRow(sortBasedAggregationBuffer, firstRowInNextGroup)
@@ -142,6 +146,7 @@ class SortBasedAggregationIterator(
firstRowInNextGroup = currentRow.copy()
}
}
+ aggTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
// We have not seen a new group. It means that there is no new row in the
input
// iter. The current group is the last group of the iter.
if (!findNextPartition) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 402365a59ece..47efcaaa4e1b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -308,6 +308,26 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
}
}
+ test("SortAggregate metrics") {
+ // Force use SortAggregateExec instead of HashAggregateExec
+ withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
+ // Assume the execution plan is
+ // -> SortAggregate(nodeId = 0)
+ // -> Sort(nodeId = 1)
+ // -> Exchange(nodeId = 2)
+ // -> SortAggregate(...)
+ val df = testData2.groupBy($"a").count()
+
+ // Test aggTime metric for grouped aggregate
+ testSparkPlanMetricsWithPredicates(df, 1, Map(
+ 0L -> (("SortAggregate", Map(
+ "time in aggregation build" -> {
+ _.toString.matches(timingMetricPattern)
+ }))))
+ )
+ }
+ }
+
test("Sort metrics") {
// Assume the execution plan with node id is
// Sort(nodeId = 0)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]