This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 899f6c90eb2 [SPARK-39751][SQL] Rename hash aggregate key probes metric
899f6c90eb2 is described below

commit 899f6c90eb2de5b46a36710a131d7417010ce4b3
Author: Cheng Su <[email protected]>
AuthorDate: Wed Jul 13 22:47:13 2022 +0800

    [SPARK-39751][SQL] Rename hash aggregate key probes metric
    
    ### What changes were proposed in this pull request?
    
    Hash aggregate has a SQL metric to record average probes per key, but it 
has a very obsure name called "avg hash probe bucket list iters". We should 
give it a better name to avoid confusing users. This PR also changes related 
functions naming.
    
    ### Why are the changes needed?
    
    To give better naming for SQL metric, to avoid confusing users.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the changed SQL metric name.
    
    ### How was this patch tested?
    
    Existing unit tests in `SQLMetricsSuite.scala`.
    
    Closes #37164 from c21/agg-rename.
    
    Authored-by: Cheng Su <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java |  2 +-
 .../spark/sql/execution/UnsafeFixedWidthAggregationMap.java    |  6 +++---
 .../spark/sql/execution/aggregate/HashAggregateExec.scala      |  4 ++--
 .../sql/execution/aggregate/TungstenAggregationIterator.scala  |  2 +-
 .../apache/spark/sql/execution/metric/SQLMetricsSuite.scala    | 10 +++++-----
 5 files changed, 12 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index f474c30b8b3..f4f4052b4fa 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -941,7 +941,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
   /**
    * Returns the average number of probes per key lookup.
    */
-  public double getAvgHashProbeBucketListIterations() {
+  public double getAvgHashProbesPerKey() {
     return (1.0 * numProbes) / numKeyLookups;
   }
 
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 43e174d7273..8587d929007 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -221,10 +221,10 @@ public final class UnsafeFixedWidthAggregationMap {
   }
 
   /**
-   * Gets the average bucket list iterations per lookup in the underlying 
`BytesToBytesMap`.
+   * Gets the average number of hash probes per key lookup in the underlying 
`BytesToBytesMap`.
    */
-  public double getAvgHashProbeBucketListIterations() {
-    return map.getAvgHashProbeBucketListIterations();
+  public double getAvgHashProbesPerKey() {
+    return map.getAvgHashProbesPerKey();
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 935844d96d9..6c83ba5546d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -68,7 +68,7 @@ case class HashAggregateExec(
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
     "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in 
aggregation build"),
     "avgHashProbe" ->
-      SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list 
iters"),
+      SQLMetrics.createAverageMetric(sparkContext, "avg hash probes per key"),
     "numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of 
sort fallback tasks"))
 
   // This is for testing. We force TungstenAggregationIterator to fall back to 
the unsafe row hash
@@ -207,7 +207,7 @@ case class HashAggregateExec(
     metrics.incPeakExecutionMemory(maxMemory)
 
     // Update average hashmap probe
-    avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
+    avgHashProbe.set(hashMap.getAvgHashProbesPerKey)
 
     if (sorter == null) {
       // not spilled
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 0a5e8838e15..36405fe9272 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -389,7 +389,7 @@ class TungstenAggregationIterator(
     metrics.incPeakExecutionMemory(maxMemory)
 
     // Updating average hashmap probe
-    avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
+    avgHashProbe.set(hashMap.getAvgHashProbesPerKey)
   })
 
   ///////////////////////////////////////////////////////////////////////////
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index c1720915812..3bc3bb2a5de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -111,11 +111,11 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
     val df = testData2.groupBy().count() // 2 partitions
     val expected1 = Seq(
       Map("number of output rows" -> 2L,
-        "avg hash probe bucket list iters" ->
+        "avg hash probes per key" ->
           aggregateMetricsPattern,
         "number of sort fallback tasks" -> 0L),
       Map("number of output rows" -> 1L,
-        "avg hash probe bucket list iters" ->
+        "avg hash probes per key" ->
           aggregateMetricsPattern,
         "number of sort fallback tasks" -> 0L))
     val shuffleExpected1 = Map(
@@ -133,11 +133,11 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
     val df2 = testData2.groupBy($"a").count()
     val expected2 = Seq(
       Map("number of output rows" -> 4L,
-        "avg hash probe bucket list iters" ->
+        "avg hash probes per key" ->
           aggregateMetricsPattern,
         "number of sort fallback tasks" -> 0L),
       Map("number of output rows" -> 3L,
-        "avg hash probe bucket list iters" ->
+        "avg hash probes per key" ->
           aggregateMetricsPattern,
         "number of sort fallback tasks" -> 0L))
 
@@ -186,7 +186,7 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
       }
       val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
       nodeIds.foreach { nodeId =>
-        val probes = metrics(nodeId)._2("avg hash probe bucket list 
iters").toString
+        val probes = metrics(nodeId)._2("avg hash probes per key").toString
         if (!probes.contains("\n")) {
           // It's a single metrics value
           assert(probes.toDouble > 1.0)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to