This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 899f6c90eb2 [SPARK-39751][SQL] Rename hash aggregate key probes metric
899f6c90eb2 is described below
commit 899f6c90eb2de5b46a36710a131d7417010ce4b3
Author: Cheng Su <[email protected]>
AuthorDate: Wed Jul 13 22:47:13 2022 +0800
[SPARK-39751][SQL] Rename hash aggregate key probes metric
### What changes were proposed in this pull request?
Hash aggregate has a SQL metric to record average probes per key, but it
has a very obsure name called "avg hash probe bucket list iters". We should
give it a better name to avoid confusing users. This PR also changes related
functions naming.
### Why are the changes needed?
To give better naming for SQL metric, to avoid confusing users.
### Does this PR introduce _any_ user-facing change?
Yes, the changed SQL metric name.
### How was this patch tested?
Existing unit tests in `SQLMetricsSuite.scala`.
Closes #37164 from c21/agg-rename.
Authored-by: Cheng Su <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java | 2 +-
.../spark/sql/execution/UnsafeFixedWidthAggregationMap.java | 6 +++---
.../spark/sql/execution/aggregate/HashAggregateExec.scala | 4 ++--
.../sql/execution/aggregate/TungstenAggregationIterator.scala | 2 +-
.../apache/spark/sql/execution/metric/SQLMetricsSuite.scala | 10 +++++-----
5 files changed, 12 insertions(+), 12 deletions(-)
diff --git
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index f474c30b8b3..f4f4052b4fa 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -941,7 +941,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
/**
* Returns the average number of probes per key lookup.
*/
- public double getAvgHashProbeBucketListIterations() {
+ public double getAvgHashProbesPerKey() {
return (1.0 * numProbes) / numKeyLookups;
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 43e174d7273..8587d929007 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -221,10 +221,10 @@ public final class UnsafeFixedWidthAggregationMap {
}
/**
- * Gets the average bucket list iterations per lookup in the underlying
`BytesToBytesMap`.
+ * Gets the average number of hash probes per key lookup in the underlying
`BytesToBytesMap`.
*/
- public double getAvgHashProbeBucketListIterations() {
- return map.getAvgHashProbeBucketListIterations();
+ public double getAvgHashProbesPerKey() {
+ return map.getAvgHashProbesPerKey();
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 935844d96d9..6c83ba5546d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -68,7 +68,7 @@ case class HashAggregateExec(
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in
aggregation build"),
"avgHashProbe" ->
- SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list
iters"),
+ SQLMetrics.createAverageMetric(sparkContext, "avg hash probes per key"),
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of
sort fallback tasks"))
// This is for testing. We force TungstenAggregationIterator to fall back to
the unsafe row hash
@@ -207,7 +207,7 @@ case class HashAggregateExec(
metrics.incPeakExecutionMemory(maxMemory)
// Update average hashmap probe
- avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
+ avgHashProbe.set(hashMap.getAvgHashProbesPerKey)
if (sorter == null) {
// not spilled
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 0a5e8838e15..36405fe9272 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -389,7 +389,7 @@ class TungstenAggregationIterator(
metrics.incPeakExecutionMemory(maxMemory)
// Updating average hashmap probe
- avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
+ avgHashProbe.set(hashMap.getAvgHashProbesPerKey)
})
///////////////////////////////////////////////////////////////////////////
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index c1720915812..3bc3bb2a5de 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -111,11 +111,11 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
val df = testData2.groupBy().count() // 2 partitions
val expected1 = Seq(
Map("number of output rows" -> 2L,
- "avg hash probe bucket list iters" ->
+ "avg hash probes per key" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L),
Map("number of output rows" -> 1L,
- "avg hash probe bucket list iters" ->
+ "avg hash probes per key" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L))
val shuffleExpected1 = Map(
@@ -133,11 +133,11 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
val df2 = testData2.groupBy($"a").count()
val expected2 = Seq(
Map("number of output rows" -> 4L,
- "avg hash probe bucket list iters" ->
+ "avg hash probes per key" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L),
Map("number of output rows" -> 3L,
- "avg hash probe bucket list iters" ->
+ "avg hash probes per key" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L))
@@ -186,7 +186,7 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
}
val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
nodeIds.foreach { nodeId =>
- val probes = metrics(nodeId)._2("avg hash probe bucket list
iters").toString
+ val probes = metrics(nodeId)._2("avg hash probes per key").toString
if (!probes.contains("\n")) {
// It's a single metrics value
assert(probes.toDouble > 1.0)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]