This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a8482ad2a725 [SPARK-54272][SQL] Add aggTime for SortAggregateExec
a8482ad2a725 is described below

commit a8482ad2a7256e46e0c14a6a1e9477f8d498818f
Author: Angerszhuuuu <[email protected]>
AuthorDate: Thu Nov 27 13:35:32 2025 +0800

    [SPARK-54272][SQL] Add aggTime for SortAggregateExec
    
    ### What changes were proposed in this pull request?
    Add `aggTime` metrics for `SortAggregateExec`
    
    ### Why are the changes needed?
    Add more metrics
    
    ### Does this PR introduce _any_ user-facing change?
    Yes the SQL metrics "time in aggregation build" itself on Spark UI.
    
    ### How was this patch tested?
    UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52968 from AngersZhuuuu/SPARK-54272.
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/execution/aggregate/SortAggregateExec.scala  |  7 +++++--
 .../aggregate/SortBasedAggregationIterator.scala     |  7 ++++++-
 .../spark/sql/execution/metric/SQLMetricsSuite.scala | 20 ++++++++++++++++++++
 3 files changed, 31 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index c4ff2454ae67..06f87af50eb5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -45,7 +45,8 @@ case class SortAggregateExec(
   with OrderPreservingUnaryExecNode {
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in 
aggregation build"))
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
     groupingExpressions.map(SortOrder(_, Ascending)) :: Nil
@@ -57,6 +58,7 @@ case class SortAggregateExec(
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
+    val aggTime = longMetric("aggTime")
     child.execute().mapPartitionsWithIndexInternal { (partIndex, iter) =>
       // Because the constructor of an aggregation iterator will read at least 
the first row,
       // we need to get the value of iter.hasNext first.
@@ -77,7 +79,8 @@ case class SortAggregateExec(
           resultExpressions,
           (expressions, inputSchema) =>
             MutableProjection.create(expressions, inputSchema),
-          numOutputRows)
+          numOutputRows,
+          aggTime)
         if (!hasInput && groupingExpressions.isEmpty) {
           // There is no input and there is no grouping expressions.
           // We need to output a single row as the output.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 143190a88821..a1c70066ee03 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.aggregate
 
+import scala.concurrent.duration.NANOSECONDS
+
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -38,7 +40,8 @@ class SortBasedAggregationIterator(
     initialInputBufferOffset: Int,
     resultExpressions: Seq[NamedExpression],
     newMutableProjection: (Seq[Expression], Seq[Attribute]) => 
MutableProjection,
-    numOutputRows: SQLMetric)
+    numOutputRows: SQLMetric,
+    aggTime: SQLMetric)
   extends AggregationIterator(
     partIndex,
     groupingExpressions,
@@ -122,6 +125,7 @@ class SortBasedAggregationIterator(
     // Now, we will start to find all rows belonging to this group.
     // We create a variable to track if we see the next group.
     var findNextPartition = false
+    val startNs = System.nanoTime()
     // firstRowInNextGroup is the first row of this group. We first process it.
     processRow(sortBasedAggregationBuffer, firstRowInNextGroup)
 
@@ -142,6 +146,7 @@ class SortBasedAggregationIterator(
         firstRowInNextGroup = currentRow.copy()
       }
     }
+    aggTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
     // We have not seen a new group. It means that there is no new row in the 
input
     // iter. The current group is the last group of the iter.
     if (!findNextPartition) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 402365a59ece..47efcaaa4e1b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -308,6 +308,26 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
     }
   }
 
+  test("SortAggregate metrics") {
+    // Force use SortAggregateExec instead of HashAggregateExec
+    withSQLConf("spark.sql.test.forceApplySortAggregate" -> "true") {
+      // Assume the execution plan is
+      // -> SortAggregate(nodeId = 0)
+      //     -> Sort(nodeId = 1)
+      //       -> Exchange(nodeId = 2)
+      //          -> SortAggregate(...)
+      val df = testData2.groupBy($"a").count()
+
+      // Test aggTime metric for grouped aggregate
+      testSparkPlanMetricsWithPredicates(df, 1, Map(
+        0L -> (("SortAggregate", Map(
+          "time in aggregation build" -> {
+            _.toString.matches(timingMetricPattern)
+          }))))
+      )
+    }
+  }
+
   test("Sort metrics") {
     // Assume the execution plan with node id is
     // Sort(nodeId = 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to