Jin Chengcheng created SPARK-48483:
--------------------------------------

             Summary: Allow UnsafeExternalSorter to spill when other consumer 
request memory
                 Key: SPARK-48483
                 URL: https://issues.apache.org/jira/browse/SPARK-48483
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 4.0.0
         Environment: Ubuntu
            Reporter: Jin Chengcheng
             Fix For: 4.0.0


The downstream Gluten(Native spark engine) meets an OOM exception.

 
{code:java}
24/04/27 11:42:59 ERROR [Executor task launch worker for task 403.0 in stage 
4.0 (TID 91404)] nmm.ManagedReservationListener: Error reserving memory from 
target
org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget$OutOfMemoryException: 
Not enough spark off-heap execution memory. Acquired: 40.0 MiB, granted: 8.0 
MiB. Try tweaking config option spark.memory.offHeap.size to get larger space 
to run this application. 
Current config settings: 
        spark.gluten.memory.offHeap.size.in.bytes=50.0 GiB
        spark.gluten.memory.task.offHeap.size.in.bytes=12.5 GiB
        spark.gluten.memory.conservative.task.offHeap.size.in.bytes=6.3 GiB
Memory consumer stats: 
        Task.91404:                                                             
      Current used bytes:   12.5 GiB, peak bytes:        N/A
        +- 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@a7836d4: 
Current used bytes:   12.4 GiB, peak bytes:        N/A
        \- Gluten.Tree.194:                                                     
      Current used bytes:   56.0 MiB, peak bytes:   11.7 GiB
           \- root.194:                                                         
      Current used bytes:   56.0 MiB, peak bytes:   11.7 GiB
              +- WholeStageIterator.194:                                        
      Current used bytes:   32.0 MiB, peak bytes:    9.0 GiB
              |  \- single:                                                     
      Current used bytes:   23.0 MiB, peak bytes:    9.0 GiB
              |     +- task.Gluten_Stage_4_TID_91404:                           
      Current used bytes:   23.0 MiB, peak bytes:    9.0 GiB
              |     |  +- node.3:                                               
      Current used bytes:   21.0 MiB, peak bytes:    9.0 GiB
              |     |  |  +- op.3.1.0.HashBuild:                                
      Current used bytes:   10.8 MiB, peak bytes:    8.5 GiB
              |     |  |  \- op.3.0.0.HashProbe:                                
      Current used bytes:    9.2 MiB, peak bytes:   21.6 MiB
              |     |  +- node.5:                                               
      Current used bytes: 1024.0 KiB, peak bytes:    2.0 MiB
              |     |  |  \- op.5.0.0.FilterProject:                            
      Current used bytes:  129.4 KiB, peak bytes: 1232.0 KiB
              |     |  +- node.2:                                               
      Current used bytes: 1024.0 KiB, peak bytes: 1024.0 KiB
              |     |  |  \- op.2.1.0.FilterProject:                            
      Current used bytes:  128.4 KiB, peak bytes:  192.4 KiB
              |     |  +- node.1:                                               
      Current used bytes:      0.0 B, peak bytes:      0.0 B
              |     |  |  \- op.1.1.0.ValueStream:                              
      Current used bytes:      0.0 B, peak bytes:      0.0 B
              |     |  +- node.0:                                               
      Current used bytes:      0.0 B, peak bytes:      0.0 B
              |     |  |  \- op.0.0.0.ValueStream:                              
      Current used bytes:      0.0 B, peak bytes:      0.0 B
              |     |  \- node.4:                                               
      Current used bytes:      0.0 B, peak bytes:      0.0 B
              |     |     \- op.4.0.0.FilterProject:                            
      Current used bytes:      0.0 B, peak bytes:      0.0 B
              |     \- WholeStageIterator_default_leaf:                         
      Current used bytes:      0.0 B, peak bytes:      0.0 B
              +- ArrowContextInstance.0:                                        
      Current used bytes:    8.0 MiB, peak bytes:    8.0 MiB
              +- ColumnarToRow.2:                                               
      Current used bytes:    8.0 MiB, peak bytes:   16.0 MiB
              |  \- single:                                                     
      Current used bytes:    6.0 MiB, peak bytes:    9.0 MiB
              |     \- ColumnarToRow_default_leaf:                              
      Current used bytes:    6.0 MiB, peak bytes:    9.0 MiB
              +- ShuffleReader.3:                                               
      Current used bytes:    8.0 MiB, peak bytes:   16.0 MiB
              |  \- single:                                                     
      Current used bytes:    2.0 MiB, peak bytes:    5.0 MiB
              |     \- ShuffleReader_default_leaf:                              
      Current used bytes: 1408.0 KiB, peak bytes:    4.1 MiB
              +- OverAcquire.DummyTarget.392:                                   
      Current used bytes:      0.0 B, peak bytes:    4.8 MiB
              +- OverAcquire.DummyTarget.385:                                   
      Current used bytes:      0.0 B, peak bytes:    4.8 MiB
              +- OverAcquire.DummyTarget.389:                                   
      Current used bytes:      0.0 B, peak bytes:    2.7 GiB
              \- ArrowContextInstance.6:                                        
      Current used bytes:      0.0 B, peak bytes:      0.0 B

        at 
org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.borrow(ThrowOnOomMemoryTarget.java:90)
        at 
org.apache.gluten.memory.nmm.ManagedReservationListener.reserve(ManagedReservationListener.java:43)
        at 
org.apache.gluten.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native 
Method)
        at 
org.apache.gluten.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:65)
        at 
org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
        at 
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
        at 
org.apache.gluten.utils.InvocationFlowProtection.hasNext(Iterators.scala:135)
        at org.apache.gluten.utils.IteratorCompleter.hasNext(Iterators.scala:69)
        at org.apache.gluten.utils.PayloadCloser.hasNext(Iterators.scala:35)
        at 
org.apache.gluten.utils.PipelineTimeAccumulator.hasNext(Iterators.scala:98)
        at 
org.apache.gluten.execution.VeloxColumnarToRowExec$$anon$1.hasNext(VeloxColumnarToRowExec.scala:131)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
scala.collection.TraversableOnce$FlattenOps$$anon$2.hasNext(TraversableOnce.scala:469)
        at 
org.apache.gluten.utils.InvocationFlowProtection.hasNext(Iterators.scala:135)
        at org.apache.gluten.utils.IteratorCompleter.hasNext(Iterators.scala:69)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
        at 
org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:984)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:984)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1470)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
W20240427 11:42:59.897979 96780 HashBuild.cpp:503] Failed to reserve 37.37MB 
for memory pool op.3.1.0.HashBuild, usage: 10.85MB, reservation: 11.00MB
W20240427 11:43:00.021775 96780 HashProbe.cpp:1573] Can't reclaim from hash 
probe operator, state_[RUNNING], nonReclaimableSection_[0], op.3.0.0.HashProbe, 
usage: 10.20MB, node pool usage: 13.00MB {code}
 

 

The reason is user defined MemoryConsumer tries to acquire memory, but the 
MemoryConsumer UnsafeExternalSorter expires a lot of memory larger to 12.4G, 
99% of the task memory, but not to spill to disk because the consumer is 
different.

 
{code:java}
public long spill(long size, MemoryConsumer trigger) throws IOException {
  # reason for not spill
  if (trigger != this) {
    if (readingIterator != null) {
      return readingIterator.spill();
    }
    return 0L; // this should throw exception
  } {code}
 

 

In the UnsafeExternalRowSorter, inputIterator.hasNext() will acquire memory by 
user defined MemoryConsumer, insertRow will request memory and may try to spill 
to disk, and sort() will generate the sortedIterator.

 

So if we must acquire a large memory to generate the UnsafeRow, it may fail. 
User defined consumer try to acquire memory by requesting UnsafeExternalSorter 
to spill, but sortedIterator is empty, so UnsafeExternalSorter reponses to no 
memory.
{code:java}
public Iterator<InternalRow> sort(Iterator<UnsafeRow> inputIterator) throws 
IOException {
  while (inputIterator.hasNext()) {
    insertRow(inputIterator.next());
  }
  return sort();
}{code}
 

A possible solution is to change the spill check to `if (trigger != this && 
readingIterator != null) `, but other consumer may has memory to release, this 
sorter does not need to spill early.

 

Another is to add an interface named `acquireMemoryExausted` and `forceSpill` 
in MemoryConsumer and `acquireExecutionMemoryExausted` in TaskMemoryManager, it 
will let the consumer to spill with edge effect when cannot acquire memory from 
all the available consumers. Currently, only `forceSpill` in 
UnsafeExternalSorter returns memory.

If the solution is determined, I can help to implement it.

If you have more reasonable suggestions, welcome to participate, thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to