[
https://issues.apache.org/jira/browse/SPARK-48483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17870074#comment-17870074
]
Jin Chengcheng commented on SPARK-48483:
----------------------------------------
Currently, Spark operator only spill itself as its design, but some extensions
like Gluten may need to trigger spill by other operators. Do you have some
suggestion?
> Allow UnsafeExternalSorter to spill when other consumer requests memory
> -----------------------------------------------------------------------
>
> Key: SPARK-48483
> URL: https://issues.apache.org/jira/browse/SPARK-48483
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 4.0.0
> Environment: Ubuntu
> Reporter: Jin Chengcheng
> Priority: Major
> Fix For: 4.0.0
>
>
> The downstream Gluten(Native spark engine) meets an OOM exception.
>
> {code:java}
> 24/04/27 11:42:59 ERROR [Executor task launch worker for task 403.0 in stage
> 4.0 (TID 91404)] nmm.ManagedReservationListener: Error reserving memory from
> target
> org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget$OutOfMemoryException:
> Not enough spark off-heap execution memory. Acquired: 40.0 MiB, granted: 8.0
> MiB. Try tweaking config option spark.memory.offHeap.size to get larger space
> to run this application.
> Current config settings:
> spark.gluten.memory.offHeap.size.in.bytes=50.0 GiB
> spark.gluten.memory.task.offHeap.size.in.bytes=12.5 GiB
> spark.gluten.memory.conservative.task.offHeap.size.in.bytes=6.3 GiB
> Memory consumer stats:
> Task.91404:
> Current used bytes: 12.5 GiB, peak bytes: N/A
> +-
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@a7836d4:
> Current used bytes: 12.4 GiB, peak bytes: N/A
> \- Gluten.Tree.194:
> Current used bytes: 56.0 MiB, peak bytes: 11.7 GiB
> \- root.194:
> Current used bytes: 56.0 MiB, peak bytes: 11.7 GiB
> +- WholeStageIterator.194:
> Current used bytes: 32.0 MiB, peak bytes: 9.0 GiB
> | \- single:
> Current used bytes: 23.0 MiB, peak bytes: 9.0 GiB
> | +- task.Gluten_Stage_4_TID_91404:
> Current used bytes: 23.0 MiB, peak bytes: 9.0 GiB
> | | +- node.3:
> Current used bytes: 21.0 MiB, peak bytes: 9.0 GiB
> | | | +- op.3.1.0.HashBuild:
> Current used bytes: 10.8 MiB, peak bytes: 8.5 GiB
> | | | \- op.3.0.0.HashProbe:
> Current used bytes: 9.2 MiB, peak bytes: 21.6 MiB
> | | +- node.5:
> Current used bytes: 1024.0 KiB, peak bytes: 2.0 MiB
> | | | \- op.5.0.0.FilterProject:
> Current used bytes: 129.4 KiB, peak bytes: 1232.0 KiB
> | | +- node.2:
> Current used bytes: 1024.0 KiB, peak bytes: 1024.0 KiB
> | | | \- op.2.1.0.FilterProject:
> Current used bytes: 128.4 KiB, peak bytes: 192.4 KiB
> | | +- node.1:
> Current used bytes: 0.0 B, peak bytes: 0.0 B
> | | | \- op.1.1.0.ValueStream:
> Current used bytes: 0.0 B, peak bytes: 0.0 B
> | | +- node.0:
> Current used bytes: 0.0 B, peak bytes: 0.0 B
> | | | \- op.0.0.0.ValueStream:
> Current used bytes: 0.0 B, peak bytes: 0.0 B
> | | \- node.4:
> Current used bytes: 0.0 B, peak bytes: 0.0 B
> | | \- op.4.0.0.FilterProject:
> Current used bytes: 0.0 B, peak bytes: 0.0 B
> | \- WholeStageIterator_default_leaf:
> Current used bytes: 0.0 B, peak bytes: 0.0 B
> +- ArrowContextInstance.0:
> Current used bytes: 8.0 MiB, peak bytes: 8.0 MiB
> +- ColumnarToRow.2:
> Current used bytes: 8.0 MiB, peak bytes: 16.0 MiB
> | \- single:
> Current used bytes: 6.0 MiB, peak bytes: 9.0 MiB
> | \- ColumnarToRow_default_leaf:
> Current used bytes: 6.0 MiB, peak bytes: 9.0 MiB
> +- ShuffleReader.3:
> Current used bytes: 8.0 MiB, peak bytes: 16.0 MiB
> | \- single:
> Current used bytes: 2.0 MiB, peak bytes: 5.0 MiB
> | \- ShuffleReader_default_leaf:
> Current used bytes: 1408.0 KiB, peak bytes: 4.1 MiB
> +- OverAcquire.DummyTarget.392:
> Current used bytes: 0.0 B, peak bytes: 4.8 MiB
> +- OverAcquire.DummyTarget.385:
> Current used bytes: 0.0 B, peak bytes: 4.8 MiB
> +- OverAcquire.DummyTarget.389:
> Current used bytes: 0.0 B, peak bytes: 2.7 GiB
> \- ArrowContextInstance.6:
> Current used bytes: 0.0 B, peak bytes: 0.0 B
> at
> org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.borrow(ThrowOnOomMemoryTarget.java:90)
> at
> org.apache.gluten.memory.nmm.ManagedReservationListener.reserve(ManagedReservationListener.java:43)
> at
> org.apache.gluten.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native
> Method)
> at
> org.apache.gluten.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:65)
> at
> org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
> at
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
> at
> org.apache.gluten.utils.InvocationFlowProtection.hasNext(Iterators.scala:135)
> at org.apache.gluten.utils.IteratorCompleter.hasNext(Iterators.scala:69)
> at org.apache.gluten.utils.PayloadCloser.hasNext(Iterators.scala:35)
> at
> org.apache.gluten.utils.PipelineTimeAccumulator.hasNext(Iterators.scala:98)
> at
> org.apache.gluten.execution.VeloxColumnarToRowExec$$anon$1.hasNext(VeloxColumnarToRowExec.scala:131)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at
> scala.collection.TraversableOnce$FlattenOps$$anon$2.hasNext(TraversableOnce.scala:469)
> at
> org.apache.gluten.utils.InvocationFlowProtection.hasNext(Iterators.scala:135)
> at org.apache.gluten.utils.IteratorCompleter.hasNext(Iterators.scala:69)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
> at
> org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:984)
> at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:984)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:131)
> at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1470)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:750)
> W20240427 11:42:59.897979 96780 HashBuild.cpp:503] Failed to reserve 37.37MB
> for memory pool op.3.1.0.HashBuild, usage: 10.85MB, reservation: 11.00MB
> W20240427 11:43:00.021775 96780 HashProbe.cpp:1573] Can't reclaim from hash
> probe operator, state_[RUNNING], nonReclaimableSection_[0],
> op.3.0.0.HashProbe, usage: 10.20MB, node pool usage: 13.00MB {code}
>
>
> The reason is user defined MemoryConsumer tries to acquire memory, but the
> MemoryConsumer UnsafeExternalSorter expires a lot of memory larger to 12.4G,
> 99% of the task memory, but not to spill to disk because the consumer is
> different.
>
> {code:java}
> public long spill(long size, MemoryConsumer trigger) throws IOException {
> # reason for not spill
> if (trigger != this) {
> if (readingIterator != null) {
> return readingIterator.spill();
> }
> return 0L; // this should throw exception
> } {code}
>
>
> In the UnsafeExternalRowSorter, inputIterator.hasNext() will acquire memory
> by user defined MemoryConsumer, insertRow will request memory and may try to
> spill to disk, and sort() will generate the sortedIterator.
>
> So if we must acquire a large memory to generate the UnsafeRow, it may fail.
> User defined consumer try to acquire memory by requesting
> UnsafeExternalSorter to spill, but sortedIterator is empty, so
> UnsafeExternalSorter reponses to no memory.
> {code:java}
> public Iterator<InternalRow> sort(Iterator<UnsafeRow> inputIterator) throws
> IOException {
> while (inputIterator.hasNext()) {
> insertRow(inputIterator.next());
> }
> return sort();
> }{code}
>
> A possible solution is to change the spill check to `if (trigger != this &&
> readingIterator != null) `, but other consumer may has memory to release,
> this sorter does not need to spill early.
>
> Another is to add an interface named `acquireMemoryExausted` and `forceSpill`
> in MemoryConsumer and `acquireExecutionMemoryExausted` in TaskMemoryManager,
> it will let the consumer to spill with edge effect when cannot acquire memory
> from all the available consumers. Currently, only `forceSpill` in
> UnsafeExternalSorter returns memory.
> If the solution is determined, I can help to implement it.
> If you have more reasonable suggestions, welcome to participate, thanks!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]