This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6b5a1f9df28 Revert "[SPARK-38354][SQL] Add hash probes metric for
shuffled hash join"
6b5a1f9df28 is described below
commit 6b5a1f9df28262fa90d28dc15af67e8a37a9efcf
Author: Cheng Su <[email protected]>
AuthorDate: Tue Apr 26 16:12:35 2022 -0700
Revert "[SPARK-38354][SQL] Add hash probes metric for shuffled hash join"
This reverts commit 158436655f30141bbd5afa8d95aec66282a5c4b4, as the
original PR caused performance regression reported in
https://github.com/apache/spark/pull/35686#issuecomment-1107807027 .
Closes #36338 from c21/revert-metrics.
Authored-by: Cheng Su <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/unsafe/map/BytesToBytesMap.java | 2 +-
.../execution/UnsafeFixedWidthAggregationMap.java | 6 ++--
.../execution/aggregate/HashAggregateExec.scala | 4 +--
.../aggregate/TungstenAggregationIterator.scala | 2 +-
.../spark/sql/execution/joins/HashedRelation.scala | 35 ----------------------
.../sql/execution/joins/ShuffledHashJoinExec.scala | 10 ++-----
.../sql/execution/metric/SQLMetricsSuite.scala | 16 +++++-----
7 files changed, 16 insertions(+), 59 deletions(-)
diff --git
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index f4f4052b4fa..f474c30b8b3 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -941,7 +941,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
/**
* Returns the average number of probes per key lookup.
*/
- public double getAvgHashProbesPerKey() {
+ public double getAvgHashProbeBucketListIterations() {
return (1.0 * numProbes) / numKeyLookups;
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 8587d929007..43e174d7273 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -221,10 +221,10 @@ public final class UnsafeFixedWidthAggregationMap {
}
/**
- * Gets the average number of hash probes per key lookup in the underlying
`BytesToBytesMap`.
+ * Gets the average bucket list iterations per lookup in the underlying
`BytesToBytesMap`.
*/
- public double getAvgHashProbesPerKey() {
- return map.getAvgHashProbesPerKey();
+ public double getAvgHashProbeBucketListIterations() {
+ return map.getAvgHashProbeBucketListIterations();
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 6c83ba5546d..935844d96d9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -68,7 +68,7 @@ case class HashAggregateExec(
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
"aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in
aggregation build"),
"avgHashProbe" ->
- SQLMetrics.createAverageMetric(sparkContext, "avg hash probes per key"),
+ SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list
iters"),
"numTasksFallBacked" -> SQLMetrics.createMetric(sparkContext, "number of
sort fallback tasks"))
// This is for testing. We force TungstenAggregationIterator to fall back to
the unsafe row hash
@@ -207,7 +207,7 @@ case class HashAggregateExec(
metrics.incPeakExecutionMemory(maxMemory)
// Update average hashmap probe
- avgHashProbe.set(hashMap.getAvgHashProbesPerKey)
+ avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
if (sorter == null) {
// not spilled
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 36405fe9272..0a5e8838e15 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -389,7 +389,7 @@ class TungstenAggregationIterator(
metrics.incPeakExecutionMemory(maxMemory)
// Updating average hashmap probe
- avgHashProbe.set(hashMap.getAvgHashProbesPerKey)
+ avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
})
///////////////////////////////////////////////////////////////////////////
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 253f16e39d3..11d3af4e546 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -110,11 +110,6 @@ private[execution] sealed trait HashedRelation extends
KnownSizeEstimation {
*/
def keys(): Iterator[InternalRow]
- /**
- * Returns the average number of hash probes per key lookup.
- */
- def getAvgHashProbesPerKey(): Double
-
/**
* Returns a read-only copy of this, to be safely used in current thread.
*/
@@ -226,8 +221,6 @@ private[joins] class UnsafeHashedRelation(
override def estimatedSize: Long = binaryMap.getTotalMemoryConsumption
- override def getAvgHashProbesPerKey(): Double =
binaryMap.getAvgHashProbesPerKey
-
// re-used in
get()/getValue()/getWithKeyIndex()/getValueWithKeyIndex()/valuesWithKeyIndex()
var resultRow = new UnsafeRow(numFields)
@@ -575,12 +568,6 @@ private[execution] final class LongToUnsafeRowMap(val mm:
TaskMemoryManager, cap
// The number of unique keys.
private var numKeys = 0L
- // The number of hash probes for keys.
- private var numProbes = 0L
-
- // The number of keys lookups.
- private var numKeyLookups = 0L
-
// needed by serializer
def this() = {
this(
@@ -629,11 +616,6 @@ private[execution] final class LongToUnsafeRowMap(val mm:
TaskMemoryManager, cap
*/
def getTotalMemoryConsumption: Long = array.length * 8L + page.length * 8L
- /**
- * Returns the average number of hash probes per key lookup.
- */
- def getAvgHashProbesPerKey: Double = (1.0 * numProbes) / numKeyLookups
-
/**
* Returns the first slot of array that store the keys (sparse mode).
*/
@@ -668,9 +650,7 @@ private[execution] final class LongToUnsafeRowMap(val mm:
TaskMemoryManager, cap
* Returns the single UnsafeRow for given key, or null if not found.
*/
def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
- numKeyLookups += 1
if (isDense) {
- numProbes += 1
if (key >= minKey && key <= maxKey) {
val value = array((key - minKey).toInt)
if (value > 0) {
@@ -678,14 +658,12 @@ private[execution] final class LongToUnsafeRowMap(val mm:
TaskMemoryManager, cap
}
}
} else {
- numProbes += 1
var pos = firstSlot(key)
while (array(pos + 1) != 0) {
if (array(pos) == key) {
return getRow(array(pos + 1), resultRow)
}
pos = nextSlot(pos)
- numProbes += 1
}
}
null
@@ -712,9 +690,7 @@ private[execution] final class LongToUnsafeRowMap(val mm:
TaskMemoryManager, cap
* Returns an iterator for all the values for the given key, or null if no
value found.
*/
def get(key: Long, resultRow: UnsafeRow): Iterator[UnsafeRow] = {
- numKeyLookups += 1
if (isDense) {
- numProbes += 1
if (key >= minKey && key <= maxKey) {
val value = array((key - minKey).toInt)
if (value > 0) {
@@ -722,14 +698,12 @@ private[execution] final class LongToUnsafeRowMap(val mm:
TaskMemoryManager, cap
}
}
} else {
- numProbes += 1
var pos = firstSlot(key)
while (array(pos + 1) != 0) {
if (array(pos) == key) {
return valueIter(array(pos + 1), resultRow)
}
pos = nextSlot(pos)
- numProbes += 1
}
}
null
@@ -808,13 +782,10 @@ private[execution] final class LongToUnsafeRowMap(val mm:
TaskMemoryManager, cap
* Update the address in array for given key.
*/
private def updateIndex(key: Long, address: Long): Unit = {
- numKeyLookups += 1
- numProbes += 1
var pos = firstSlot(key)
assert(numKeys < array.length / 2)
while (array(pos) != key && array(pos + 1) != 0) {
pos = nextSlot(pos)
- numProbes += 1
}
if (array(pos + 1) == 0) {
// this is the first value for this key, put the address in array.
@@ -1017,8 +988,6 @@ class LongHashedRelation(
override def estimatedSize: Long = map.getTotalMemoryConsumption
- override def getAvgHashProbesPerKey(): Double = map.getAvgHashProbesPerKey
-
override def get(key: InternalRow): Iterator[InternalRow] = {
if (key.isNullAt(0)) {
null
@@ -1136,8 +1105,6 @@ case object EmptyHashedRelation extends HashedRelation {
override def close(): Unit = {}
override def estimatedSize: Long = 0
-
- override def getAvgHashProbesPerKey(): Double = 0
}
/**
@@ -1164,8 +1131,6 @@ case object HashedRelationWithAllNullKeys extends
HashedRelation {
override def close(): Unit = {}
override def estimatedSize: Long = 0
-
- override def getAvgHashProbesPerKey(): Double = 0
}
/** The HashedRelationBroadcastMode requires that rows are broadcasted as a
HashedRelation. */
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 38c9c82f77e..cfe35d04778 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -49,8 +49,7 @@ case class ShuffledHashJoinExec(
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
"buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of
build side"),
- "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build
hash map"),
- "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash
probes per key"))
+ "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build
hash map"))
override def output: Seq[Attribute] = super[ShuffledJoin].output
@@ -78,7 +77,6 @@ case class ShuffledHashJoinExec(
def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
val buildDataSize = longMetric("buildDataSize")
val buildTime = longMetric("buildTime")
- val avgHashProbe = longMetric("avgHashProbe")
val start = System.nanoTime()
val context = TaskContext.get()
val relation = HashedRelation(
@@ -91,11 +89,7 @@ case class ShuffledHashJoinExec(
buildTime += NANOSECONDS.toMillis(System.nanoTime() - start)
buildDataSize += relation.estimatedSize
// This relation is usually used until the end of task.
- context.addTaskCompletionListener[Unit](_ => {
- // Update average hashmap probe
- avgHashProbe.set(relation.getAvgHashProbesPerKey())
- relation.close()
- })
+ context.addTaskCompletionListener[Unit](_ => relation.close())
relation
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 37a6d18e516..ee10ae2e334 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -109,11 +109,11 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
val df = testData2.groupBy().count() // 2 partitions
val expected1 = Seq(
Map("number of output rows" -> 2L,
- "avg hash probes per key" ->
+ "avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L),
Map("number of output rows" -> 1L,
- "avg hash probes per key" ->
+ "avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L))
val shuffleExpected1 = Map(
@@ -131,11 +131,11 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
val df2 = testData2.groupBy($"a").count()
val expected2 = Seq(
Map("number of output rows" -> 4L,
- "avg hash probes per key" ->
+ "avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L),
Map("number of output rows" -> 3L,
- "avg hash probes per key" ->
+ "avg hash probe bucket list iters" ->
aggregateMetricsPattern,
"number of sort fallback tasks" -> 0L))
@@ -184,7 +184,7 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
}
val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
nodeIds.foreach { nodeId =>
- val probes = metrics(nodeId)._2("avg hash probes per key").toString
+ val probes = metrics(nodeId)._2("avg hash probe bucket list
iters").toString
if (!probes.contains("\n")) {
// It's a single metrics value
assert(probes.toDouble > 1.0)
@@ -372,8 +372,7 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
val df = df1.join(df2, "key")
testSparkPlanMetrics(df, 1, Map(
nodeId1 -> (("ShuffledHashJoin", Map(
- "number of output rows" -> 2L,
- "avg hash probes per key" -> aggregateMetricsPattern))),
+ "number of output rows" -> 2L))),
nodeId2 -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L))),
@@ -402,8 +401,7 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType)
testSparkPlanMetrics(df, 1, Map(
nodeId -> (("ShuffledHashJoin", Map(
- "number of output rows" -> rows,
- "avg hash probes per key" -> aggregateMetricsPattern)))),
+ "number of output rows" -> rows)))),
enableWholeStage
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]