This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 15e6369e946b [SPARK-48642][CORE] False SparkOutOfMemoryError caused by
killing task on spilling
15e6369e946b is described below
commit 15e6369e946bd04ddf9e1039909fe97c60d35504
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jun 17 11:19:58 2024 -0700
[SPARK-48642][CORE] False SparkOutOfMemoryError caused by killing task on
spilling
### What changes were proposed in this pull request?
Throw `RuntimeException` instead of `SparkOutOfMemoryError` when underlying
calls throw `InterruptedIOException` in `TaskMemoryManager#trySpillAndAcquire`
### Why are the changes needed?
A false `SparkOutOfMemoryError` case was identified in our production Spark
jobs, and it is similar to SPARK-20250
```
2024-06-17 06:03:20 CST Executor INFO - Executor is trying to kill task
1580.1 in stage 48.0 (TID 59486), reason: another attempt succeeded
2024-06-17 06:03:20 CST TaskMemoryManager ERROR - error while calling
spill() on
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7
java.io.InterruptedIOException: null
at
org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:234)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:272)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:251)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at java.io.DataInputStream.readInt(DataInputStream.java:393) ~[?:?]
at
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:80)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:626)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:204)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:227)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
...
at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at org.apache.spark.scheduler.Task.run(Task.scala:136)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638)
~[?:?]
at
org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:231)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
... 111 more
2024-06-17 06:03:21 CST Executor ERROR - Exception in task 1580.1 in stage
48.0 (TID 59486)
org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill()
on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7 :
null
at
org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:253)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
...
at org.apache.spark.rdd.RDD.iterator(RDD.scala:332)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at org.apache.spark.scheduler.Task.run(Task.scala:136)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[?:?]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
```
### Does this PR introduce _any_ user-facing change?
Yes, the killing task on spilling won't report a false
`SparkOutOfMemoryError`, so that the killed task status is KILLED instead of
FAILED.
### How was this patch tested?
Existing tests to ensure the change breaks nothing.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47000 from pan3793/SPARK-48642.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 00a96bb42ec69793f6251bf718d997e21e87c824)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 83352611770f..08c080f5a5a1 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -18,6 +18,7 @@
package org.apache.spark.memory;
import javax.annotation.concurrent.GuardedBy;
+import java.io.InterruptedIOException;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Arrays;
@@ -242,7 +243,7 @@ public class TaskMemoryManager {
cList.remove(idx);
return 0;
}
- } catch (ClosedByInterruptException e) {
+ } catch (ClosedByInterruptException | InterruptedIOException e) {
// This called by user to kill a task (e.g: speculative task).
logger.error("error while calling spill() on " + consumerToSpill, e);
throw new RuntimeException(e.getMessage());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]