This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0f4a9ac18f7 [SPARK-40224][SQL] Make ObjectHashAggregateExec release
memory eagerly when fallback to sort-based
0f4a9ac18f7 is described below
commit 0f4a9ac18f7af9238a46d6bc8a40f593556451a2
Author: ulysses-you <[email protected]>
AuthorDate: Mon Aug 29 15:22:07 2022 +0800
[SPARK-40224][SQL] Make ObjectHashAggregateExec release memory eagerly when
fallback to sort-based
### What changes were proposed in this pull request?
Release the fetched key and aggregate buffer during iterating when the
object hash-based dump to sort-basd.
### Why are the changes needed?
When the cardinality of grouping keys grow up over the
fallbackCountThreshold, the ObjectHashAggregateExec will dump the hash map
whose memory usage is not managed to sort-based. However, at this moment, we
will keep the double memory overhead for both hash map and unsafe object, and
it may cause OOM.
Unfortuntly, we encounter this issue. The error msg:
```
ObjectAggregationIterator INFO - Aggregation hash map size 128 reaches
threshold capacity (128 entries), spilling and falling back to sort based
aggregation. You may change the threshold by adjust option
spark.sql.objectHashAggregate.sortBased.fallbackThreshold
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
# Executing /bin/sh -c "kill 46725"...
```
### Does this PR introduce _any_ user-facing change?
To reduce the possibility of OOM issue when ObjectHashAggregateExec
fallback to sort-based.
### How was this patch tested?
It's an obvious improvement that release unused object
Closes #37668 from ulysses-you/object-hash.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/aggregate/ObjectAggregationIterator.scala | 2 +-
.../spark/sql/execution/aggregate/ObjectAggregationMap.scala | 9 +++++++--
.../sql/execution/aggregate/SortBasedAggregationStoreSuite.scala | 3 ++-
3 files changed, 10 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
index ebb6ee38524..1d89e56eebd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
@@ -207,7 +207,7 @@ class ObjectAggregationIterator(
if (sortBased) {
aggBufferIterator = sortBasedAggregationStore.destructiveIterator()
} else {
- aggBufferIterator = hashMap.iterator
+ aggBufferIterator = hashMap.destructiveIterator()
}
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
index 9f2cf84a6d7..6aede04b069 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
@@ -45,7 +45,11 @@ class ObjectAggregationMap() {
def size: Int = hashMap.size()
- def iterator: Iterator[AggregationBufferEntry] = {
+ /**
+ * Returns a destructive iterator of AggregationBufferEntry.
+ * Notice: it is illegal to call any method after `destructiveIterator()`
has been called.
+ */
+ def destructiveIterator(): Iterator[AggregationBufferEntry] = {
val iter = hashMap.entrySet().iterator()
new Iterator[AggregationBufferEntry] {
@@ -54,6 +58,7 @@ class ObjectAggregationMap() {
}
override def next(): AggregationBufferEntry = {
val entry = iter.next()
+ iter.remove()
new AggregationBufferEntry(entry.getKey, entry.getValue)
}
}
@@ -77,7 +82,7 @@ class ObjectAggregationMap() {
null
)
- val mapIterator = iterator
+ val mapIterator = destructiveIterator()
val unsafeAggBufferProjection =
UnsafeProjection.create(aggBufferAttributes.map(_.dataType).toArray)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
index a672a3fb1b3..4a0c88be423 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
@@ -124,7 +124,8 @@ class SortBasedAggregationStoreSuite extends SparkFunSuite
with LocalSparkConte
def createSortedAggBufferIterator(
hashMap: ObjectAggregationMap): KVIterator[UnsafeRow, UnsafeRow] = {
- val sortedIterator =
hashMap.iterator.toList.sortBy(_.groupingKey.getInt(0)).iterator
+ val sortedIterator =
hashMap.destructiveIterator().toList.sortBy(_.groupingKey.getInt(0))
+ .iterator
new KVIterator[UnsafeRow, UnsafeRow] {
var key: UnsafeRow = null
var value: UnsafeRow = null
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]