[ 
https://issues.apache.org/jira/browse/SPARK-48483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17870074#comment-17870074
 ] 

Jin Chengcheng commented on SPARK-48483:
----------------------------------------

Currently, Spark operator only spill itself as its design, but some extensions 
like Gluten may need to trigger spill by other operators. Do you have some 
suggestion?

> Allow UnsafeExternalSorter to spill when other consumer requests memory
> -----------------------------------------------------------------------
>
>                 Key: SPARK-48483
>                 URL: https://issues.apache.org/jira/browse/SPARK-48483
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 4.0.0
>         Environment: Ubuntu
>            Reporter: Jin Chengcheng
>            Priority: Major
>             Fix For: 4.0.0
>
>
> The downstream Gluten(Native spark engine) meets an OOM exception.
>  
> {code:java}
> 24/04/27 11:42:59 ERROR [Executor task launch worker for task 403.0 in stage 
> 4.0 (TID 91404)] nmm.ManagedReservationListener: Error reserving memory from 
> target
> org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget$OutOfMemoryException:
>  Not enough spark off-heap execution memory. Acquired: 40.0 MiB, granted: 8.0 
> MiB. Try tweaking config option spark.memory.offHeap.size to get larger space 
> to run this application. 
> Current config settings: 
>       spark.gluten.memory.offHeap.size.in.bytes=50.0 GiB
>       spark.gluten.memory.task.offHeap.size.in.bytes=12.5 GiB
>       spark.gluten.memory.conservative.task.offHeap.size.in.bytes=6.3 GiB
> Memory consumer stats: 
>       Task.91404:                                                             
>       Current used bytes:   12.5 GiB, peak bytes:        N/A
>       +- 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@a7836d4: 
> Current used bytes:   12.4 GiB, peak bytes:        N/A
>       \- Gluten.Tree.194:                                                     
>       Current used bytes:   56.0 MiB, peak bytes:   11.7 GiB
>          \- root.194:                                                         
>       Current used bytes:   56.0 MiB, peak bytes:   11.7 GiB
>             +- WholeStageIterator.194:                                        
>       Current used bytes:   32.0 MiB, peak bytes:    9.0 GiB
>             |  \- single:                                                     
>       Current used bytes:   23.0 MiB, peak bytes:    9.0 GiB
>             |     +- task.Gluten_Stage_4_TID_91404:                           
>       Current used bytes:   23.0 MiB, peak bytes:    9.0 GiB
>             |     |  +- node.3:                                               
>       Current used bytes:   21.0 MiB, peak bytes:    9.0 GiB
>             |     |  |  +- op.3.1.0.HashBuild:                                
>       Current used bytes:   10.8 MiB, peak bytes:    8.5 GiB
>             |     |  |  \- op.3.0.0.HashProbe:                                
>       Current used bytes:    9.2 MiB, peak bytes:   21.6 MiB
>             |     |  +- node.5:                                               
>       Current used bytes: 1024.0 KiB, peak bytes:    2.0 MiB
>             |     |  |  \- op.5.0.0.FilterProject:                            
>       Current used bytes:  129.4 KiB, peak bytes: 1232.0 KiB
>             |     |  +- node.2:                                               
>       Current used bytes: 1024.0 KiB, peak bytes: 1024.0 KiB
>             |     |  |  \- op.2.1.0.FilterProject:                            
>       Current used bytes:  128.4 KiB, peak bytes:  192.4 KiB
>             |     |  +- node.1:                                               
>       Current used bytes:      0.0 B, peak bytes:      0.0 B
>             |     |  |  \- op.1.1.0.ValueStream:                              
>       Current used bytes:      0.0 B, peak bytes:      0.0 B
>             |     |  +- node.0:                                               
>       Current used bytes:      0.0 B, peak bytes:      0.0 B
>             |     |  |  \- op.0.0.0.ValueStream:                              
>       Current used bytes:      0.0 B, peak bytes:      0.0 B
>             |     |  \- node.4:                                               
>       Current used bytes:      0.0 B, peak bytes:      0.0 B
>             |     |     \- op.4.0.0.FilterProject:                            
>       Current used bytes:      0.0 B, peak bytes:      0.0 B
>             |     \- WholeStageIterator_default_leaf:                         
>       Current used bytes:      0.0 B, peak bytes:      0.0 B
>             +- ArrowContextInstance.0:                                        
>       Current used bytes:    8.0 MiB, peak bytes:    8.0 MiB
>             +- ColumnarToRow.2:                                               
>       Current used bytes:    8.0 MiB, peak bytes:   16.0 MiB
>             |  \- single:                                                     
>       Current used bytes:    6.0 MiB, peak bytes:    9.0 MiB
>             |     \- ColumnarToRow_default_leaf:                              
>       Current used bytes:    6.0 MiB, peak bytes:    9.0 MiB
>             +- ShuffleReader.3:                                               
>       Current used bytes:    8.0 MiB, peak bytes:   16.0 MiB
>             |  \- single:                                                     
>       Current used bytes:    2.0 MiB, peak bytes:    5.0 MiB
>             |     \- ShuffleReader_default_leaf:                              
>       Current used bytes: 1408.0 KiB, peak bytes:    4.1 MiB
>             +- OverAcquire.DummyTarget.392:                                   
>       Current used bytes:      0.0 B, peak bytes:    4.8 MiB
>             +- OverAcquire.DummyTarget.385:                                   
>       Current used bytes:      0.0 B, peak bytes:    4.8 MiB
>             +- OverAcquire.DummyTarget.389:                                   
>       Current used bytes:      0.0 B, peak bytes:    2.7 GiB
>             \- ArrowContextInstance.6:                                        
>       Current used bytes:      0.0 B, peak bytes:      0.0 B
>       at 
> org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.borrow(ThrowOnOomMemoryTarget.java:90)
>       at 
> org.apache.gluten.memory.nmm.ManagedReservationListener.reserve(ManagedReservationListener.java:43)
>       at 
> org.apache.gluten.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native 
> Method)
>       at 
> org.apache.gluten.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:65)
>       at 
> org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
>       at 
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
>       at 
> org.apache.gluten.utils.InvocationFlowProtection.hasNext(Iterators.scala:135)
>       at org.apache.gluten.utils.IteratorCompleter.hasNext(Iterators.scala:69)
>       at org.apache.gluten.utils.PayloadCloser.hasNext(Iterators.scala:35)
>       at 
> org.apache.gluten.utils.PipelineTimeAccumulator.hasNext(Iterators.scala:98)
>       at 
> org.apache.gluten.execution.VeloxColumnarToRowExec$$anon$1.hasNext(VeloxColumnarToRowExec.scala:131)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>       at 
> scala.collection.TraversableOnce$FlattenOps$$anon$2.hasNext(TraversableOnce.scala:469)
>       at 
> org.apache.gluten.utils.InvocationFlowProtection.hasNext(Iterators.scala:135)
>       at org.apache.gluten.utils.IteratorCompleter.hasNext(Iterators.scala:69)
>       at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
>       at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
>       at 
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
>       at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:984)
>       at 
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:984)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:131)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1470)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> W20240427 11:42:59.897979 96780 HashBuild.cpp:503] Failed to reserve 37.37MB 
> for memory pool op.3.1.0.HashBuild, usage: 10.85MB, reservation: 11.00MB
> W20240427 11:43:00.021775 96780 HashProbe.cpp:1573] Can't reclaim from hash 
> probe operator, state_[RUNNING], nonReclaimableSection_[0], 
> op.3.0.0.HashProbe, usage: 10.20MB, node pool usage: 13.00MB {code}
>  
>  
> The reason is user defined MemoryConsumer tries to acquire memory, but the 
> MemoryConsumer UnsafeExternalSorter expires a lot of memory larger to 12.4G, 
> 99% of the task memory, but not to spill to disk because the consumer is 
> different.
>  
> {code:java}
> public long spill(long size, MemoryConsumer trigger) throws IOException {
>   # reason for not spill
>   if (trigger != this) {
>     if (readingIterator != null) {
>       return readingIterator.spill();
>     }
>     return 0L; // this should throw exception
>   } {code}
>  
>  
> In the UnsafeExternalRowSorter, inputIterator.hasNext() will acquire memory 
> by user defined MemoryConsumer, insertRow will request memory and may try to 
> spill to disk, and sort() will generate the sortedIterator.
>  
> So if we must acquire a large memory to generate the UnsafeRow, it may fail. 
> User defined consumer try to acquire memory by requesting 
> UnsafeExternalSorter to spill, but sortedIterator is empty, so 
> UnsafeExternalSorter reponses to no memory.
> {code:java}
> public Iterator<InternalRow> sort(Iterator<UnsafeRow> inputIterator) throws 
> IOException {
>   while (inputIterator.hasNext()) {
>     insertRow(inputIterator.next());
>   }
>   return sort();
> }{code}
>  
> A possible solution is to change the spill check to `if (trigger != this && 
> readingIterator != null) `, but other consumer may has memory to release, 
> this sorter does not need to spill early.
>  
> Another is to add an interface named `acquireMemoryExausted` and `forceSpill` 
> in MemoryConsumer and `acquireExecutionMemoryExausted` in TaskMemoryManager, 
> it will let the consumer to spill with edge effect when cannot acquire memory 
> from all the available consumers. Currently, only `forceSpill` in 
> UnsafeExternalSorter returns memory.
> If the solution is determined, I can help to implement it.
> If you have more reasonable suggestions, welcome to participate, thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to