Jin Chengcheng created SPARK-48483:
--------------------------------------
Summary: Allow UnsafeExternalSorter to spill when other consumer
request memory
Key: SPARK-48483
URL: https://issues.apache.org/jira/browse/SPARK-48483
Project: Spark
Issue Type: Improvement
Components: Spark Core
Affects Versions: 4.0.0
Environment: Ubuntu
Reporter: Jin Chengcheng
Fix For: 4.0.0
The downstream Gluten(Native spark engine) meets an OOM exception.
{code:java}
24/04/27 11:42:59 ERROR [Executor task launch worker for task 403.0 in stage
4.0 (TID 91404)] nmm.ManagedReservationListener: Error reserving memory from
target
org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget$OutOfMemoryException:
Not enough spark off-heap execution memory. Acquired: 40.0 MiB, granted: 8.0
MiB. Try tweaking config option spark.memory.offHeap.size to get larger space
to run this application.
Current config settings:
spark.gluten.memory.offHeap.size.in.bytes=50.0 GiB
spark.gluten.memory.task.offHeap.size.in.bytes=12.5 GiB
spark.gluten.memory.conservative.task.offHeap.size.in.bytes=6.3 GiB
Memory consumer stats:
Task.91404:
Current used bytes: 12.5 GiB, peak bytes: N/A
+-
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@a7836d4:
Current used bytes: 12.4 GiB, peak bytes: N/A
\- Gluten.Tree.194:
Current used bytes: 56.0 MiB, peak bytes: 11.7 GiB
\- root.194:
Current used bytes: 56.0 MiB, peak bytes: 11.7 GiB
+- WholeStageIterator.194:
Current used bytes: 32.0 MiB, peak bytes: 9.0 GiB
| \- single:
Current used bytes: 23.0 MiB, peak bytes: 9.0 GiB
| +- task.Gluten_Stage_4_TID_91404:
Current used bytes: 23.0 MiB, peak bytes: 9.0 GiB
| | +- node.3:
Current used bytes: 21.0 MiB, peak bytes: 9.0 GiB
| | | +- op.3.1.0.HashBuild:
Current used bytes: 10.8 MiB, peak bytes: 8.5 GiB
| | | \- op.3.0.0.HashProbe:
Current used bytes: 9.2 MiB, peak bytes: 21.6 MiB
| | +- node.5:
Current used bytes: 1024.0 KiB, peak bytes: 2.0 MiB
| | | \- op.5.0.0.FilterProject:
Current used bytes: 129.4 KiB, peak bytes: 1232.0 KiB
| | +- node.2:
Current used bytes: 1024.0 KiB, peak bytes: 1024.0 KiB
| | | \- op.2.1.0.FilterProject:
Current used bytes: 128.4 KiB, peak bytes: 192.4 KiB
| | +- node.1:
Current used bytes: 0.0 B, peak bytes: 0.0 B
| | | \- op.1.1.0.ValueStream:
Current used bytes: 0.0 B, peak bytes: 0.0 B
| | +- node.0:
Current used bytes: 0.0 B, peak bytes: 0.0 B
| | | \- op.0.0.0.ValueStream:
Current used bytes: 0.0 B, peak bytes: 0.0 B
| | \- node.4:
Current used bytes: 0.0 B, peak bytes: 0.0 B
| | \- op.4.0.0.FilterProject:
Current used bytes: 0.0 B, peak bytes: 0.0 B
| \- WholeStageIterator_default_leaf:
Current used bytes: 0.0 B, peak bytes: 0.0 B
+- ArrowContextInstance.0:
Current used bytes: 8.0 MiB, peak bytes: 8.0 MiB
+- ColumnarToRow.2:
Current used bytes: 8.0 MiB, peak bytes: 16.0 MiB
| \- single:
Current used bytes: 6.0 MiB, peak bytes: 9.0 MiB
| \- ColumnarToRow_default_leaf:
Current used bytes: 6.0 MiB, peak bytes: 9.0 MiB
+- ShuffleReader.3:
Current used bytes: 8.0 MiB, peak bytes: 16.0 MiB
| \- single:
Current used bytes: 2.0 MiB, peak bytes: 5.0 MiB
| \- ShuffleReader_default_leaf:
Current used bytes: 1408.0 KiB, peak bytes: 4.1 MiB
+- OverAcquire.DummyTarget.392:
Current used bytes: 0.0 B, peak bytes: 4.8 MiB
+- OverAcquire.DummyTarget.385:
Current used bytes: 0.0 B, peak bytes: 4.8 MiB
+- OverAcquire.DummyTarget.389:
Current used bytes: 0.0 B, peak bytes: 2.7 GiB
\- ArrowContextInstance.6:
Current used bytes: 0.0 B, peak bytes: 0.0 B
at
org.apache.gluten.memory.memtarget.ThrowOnOomMemoryTarget.borrow(ThrowOnOomMemoryTarget.java:90)
at
org.apache.gluten.memory.nmm.ManagedReservationListener.reserve(ManagedReservationListener.java:43)
at
org.apache.gluten.vectorized.ColumnarBatchOutIterator.nativeHasNext(Native
Method)
at
org.apache.gluten.vectorized.ColumnarBatchOutIterator.hasNextInternal(ColumnarBatchOutIterator.java:65)
at
org.apache.gluten.vectorized.GeneralOutIterator.hasNext(GeneralOutIterator.java:37)
at
scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:43)
at
org.apache.gluten.utils.InvocationFlowProtection.hasNext(Iterators.scala:135)
at org.apache.gluten.utils.IteratorCompleter.hasNext(Iterators.scala:69)
at org.apache.gluten.utils.PayloadCloser.hasNext(Iterators.scala:35)
at
org.apache.gluten.utils.PipelineTimeAccumulator.hasNext(Iterators.scala:98)
at
org.apache.gluten.execution.VeloxColumnarToRowExec$$anon$1.hasNext(VeloxColumnarToRowExec.scala:131)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
scala.collection.TraversableOnce$FlattenOps$$anon$2.hasNext(TraversableOnce.scala:469)
at
org.apache.gluten.utils.InvocationFlowProtection.hasNext(Iterators.scala:135)
at org.apache.gluten.utils.IteratorCompleter.hasNext(Iterators.scala:69)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:225)
at
org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:984)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:984)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1470)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
W20240427 11:42:59.897979 96780 HashBuild.cpp:503] Failed to reserve 37.37MB
for memory pool op.3.1.0.HashBuild, usage: 10.85MB, reservation: 11.00MB
W20240427 11:43:00.021775 96780 HashProbe.cpp:1573] Can't reclaim from hash
probe operator, state_[RUNNING], nonReclaimableSection_[0], op.3.0.0.HashProbe,
usage: 10.20MB, node pool usage: 13.00MB {code}
The reason is user defined MemoryConsumer tries to acquire memory, but the
MemoryConsumer UnsafeExternalSorter expires a lot of memory larger to 12.4G,
99% of the task memory, but not to spill to disk because the consumer is
different.
{code:java}
public long spill(long size, MemoryConsumer trigger) throws IOException {
# reason for not spill
if (trigger != this) {
if (readingIterator != null) {
return readingIterator.spill();
}
return 0L; // this should throw exception
} {code}
In the UnsafeExternalRowSorter, inputIterator.hasNext() will acquire memory by
user defined MemoryConsumer, insertRow will request memory and may try to spill
to disk, and sort() will generate the sortedIterator.
So if we must acquire a large memory to generate the UnsafeRow, it may fail.
User defined consumer try to acquire memory by requesting UnsafeExternalSorter
to spill, but sortedIterator is empty, so UnsafeExternalSorter reponses to no
memory.
{code:java}
public Iterator<InternalRow> sort(Iterator<UnsafeRow> inputIterator) throws
IOException {
while (inputIterator.hasNext()) {
insertRow(inputIterator.next());
}
return sort();
}{code}
A possible solution is to change the spill check to `if (trigger != this &&
readingIterator != null) `, but other consumer may has memory to release, this
sorter does not need to spill early.
Another is to add an interface named `acquireMemoryExausted` and `forceSpill`
in MemoryConsumer and `acquireExecutionMemoryExausted` in TaskMemoryManager, it
will let the consumer to spill with edge effect when cannot acquire memory from
all the available consumers. Currently, only `forceSpill` in
UnsafeExternalSorter returns memory.
If the solution is determined, I can help to implement it.
If you have more reasonable suggestions, welcome to participate, thanks!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]