This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f4a9ac18f7 [SPARK-40224][SQL] Make ObjectHashAggregateExec release 
memory eagerly when fallback to sort-based
0f4a9ac18f7 is described below

commit 0f4a9ac18f7af9238a46d6bc8a40f593556451a2
Author: ulysses-you <[email protected]>
AuthorDate: Mon Aug 29 15:22:07 2022 +0800

    [SPARK-40224][SQL] Make ObjectHashAggregateExec release memory eagerly when 
fallback to sort-based
    
    ### What changes were proposed in this pull request?
    
    Release the fetched key and aggregate buffer during iterating when the 
object hash-based dump to sort-basd.
    
    ### Why are the changes needed?
    
    When the cardinality of grouping keys grow up over the 
fallbackCountThreshold, the ObjectHashAggregateExec will dump the hash map 
whose memory usage is not managed to sort-based. However, at this moment, we 
will keep the double memory overhead for both hash map and unsafe object, and 
it may cause OOM.
    
    Unfortuntly, we encounter this issue. The error msg:
    ```
    ObjectAggregationIterator INFO - Aggregation hash map size 128 reaches 
threshold capacity (128 entries), spilling and falling back to sort based 
aggregation. You may change the threshold by adjust option 
spark.sql.objectHashAggregate.sortBased.fallbackThreshold
    
    #
    # java.lang.OutOfMemoryError: Java heap space
    # -XX:OnOutOfMemoryError="kill %p"
    #   Executing /bin/sh -c "kill 46725"...
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    To reduce the possibility of OOM issue when ObjectHashAggregateExec 
fallback to sort-based.
    
    ### How was this patch tested?
    
    It's an obvious improvement that release unused object
    
    Closes #37668 from ulysses-you/object-hash.
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/execution/aggregate/ObjectAggregationIterator.scala      | 2 +-
 .../spark/sql/execution/aggregate/ObjectAggregationMap.scala     | 9 +++++++--
 .../sql/execution/aggregate/SortBasedAggregationStoreSuite.scala | 3 ++-
 3 files changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
index ebb6ee38524..1d89e56eebd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationIterator.scala
@@ -207,7 +207,7 @@ class ObjectAggregationIterator(
     if (sortBased) {
       aggBufferIterator = sortBasedAggregationStore.destructiveIterator()
     } else {
-      aggBufferIterator = hashMap.iterator
+      aggBufferIterator = hashMap.destructiveIterator()
     }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
index 9f2cf84a6d7..6aede04b069 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectAggregationMap.scala
@@ -45,7 +45,11 @@ class ObjectAggregationMap() {
 
   def size: Int = hashMap.size()
 
-  def iterator: Iterator[AggregationBufferEntry] = {
+  /**
+   * Returns a destructive iterator of AggregationBufferEntry.
+   * Notice: it is illegal to call any method after `destructiveIterator()` 
has been called.
+   */
+  def destructiveIterator(): Iterator[AggregationBufferEntry] = {
     val iter = hashMap.entrySet().iterator()
     new Iterator[AggregationBufferEntry] {
 
@@ -54,6 +58,7 @@ class ObjectAggregationMap() {
       }
       override def next(): AggregationBufferEntry = {
         val entry = iter.next()
+        iter.remove()
         new AggregationBufferEntry(entry.getKey, entry.getValue)
       }
     }
@@ -77,7 +82,7 @@ class ObjectAggregationMap() {
       null
     )
 
-    val mapIterator = iterator
+    val mapIterator = destructiveIterator()
     val unsafeAggBufferProjection =
       UnsafeProjection.create(aggBufferAttributes.map(_.dataType).toArray)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
index a672a3fb1b3..4a0c88be423 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationStoreSuite.scala
@@ -124,7 +124,8 @@ class SortBasedAggregationStoreSuite  extends SparkFunSuite 
with LocalSparkConte
   def createSortedAggBufferIterator(
       hashMap: ObjectAggregationMap): KVIterator[UnsafeRow, UnsafeRow] = {
 
-    val sortedIterator = 
hashMap.iterator.toList.sortBy(_.groupingKey.getInt(0)).iterator
+    val sortedIterator = 
hashMap.destructiveIterator().toList.sortBy(_.groupingKey.getInt(0))
+      .iterator
     new KVIterator[UnsafeRow, UnsafeRow] {
       var key: UnsafeRow = null
       var value: UnsafeRow = null


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to