This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 15e6369e946b [SPARK-48642][CORE] False SparkOutOfMemoryError caused by 
killing task on spilling
15e6369e946b is described below

commit 15e6369e946bd04ddf9e1039909fe97c60d35504
Author: Cheng Pan <[email protected]>
AuthorDate: Mon Jun 17 11:19:58 2024 -0700

    [SPARK-48642][CORE] False SparkOutOfMemoryError caused by killing task on 
spilling
    
    ### What changes were proposed in this pull request?
    
    Throw `RuntimeException` instead of `SparkOutOfMemoryError` when underlying 
calls throw `InterruptedIOException` in `TaskMemoryManager#trySpillAndAcquire`
    
    ### Why are the changes needed?
    
    A false `SparkOutOfMemoryError` case was identified in our production Spark 
jobs, and it is similar to SPARK-20250
    
    ```
    2024-06-17 06:03:20 CST Executor INFO - Executor is trying to kill task 
1580.1 in stage 48.0 (TID 59486), reason: another attempt succeeded
    2024-06-17 06:03:20 CST TaskMemoryManager ERROR - error while calling 
spill() on 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7
    java.io.InterruptedIOException: null
            at 
org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:234)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:272) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.io.ReadAheadInputStream.read(ReadAheadInputStream.java:251) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at java.io.DataInputStream.readInt(DataInputStream.java:393) ~[?:?]
            at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:80)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:626)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:204)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:227)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
 ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
            ...
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at org.apache.spark.scheduler.Task.run(Task.scala:136) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
~[?:?]
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
~[?:?]
            at java.lang.Thread.run(Thread.java:833) ~[?:?]
    Caused by: java.lang.InterruptedException
            at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1638)
 ~[?:?]
            at 
org.apache.spark.io.ReadAheadInputStream.waitForAsyncReadComplete(ReadAheadInputStream.java:231)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            ... 111 more
    2024-06-17 06:03:21 CST Executor ERROR - Exception in task 1580.1 in stage 
48.0 (TID 59486)
    org.apache.spark.memory.SparkOutOfMemoryError: error while calling spill() 
on org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter7cfefcb7 : 
null
            at 
org.apache.spark.memory.TaskMemoryManager.trySpillAndAcquire(TaskMemoryManager.java:253)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:190)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:317)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:116) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:431)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.allocateMemoryForRecordIfNecessary(UnsafeExternalSorter.java:450)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:485)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:138)
 ~[spark-sql_2.12-3.3.1.45.jar:3.3.1.45]
            ...
            at org.apache.spark.rdd.RDD.iterator(RDD.scala:332) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at org.apache.spark.scheduler.Task.run(Task.scala:136) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
 ~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1508) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) 
~[spark-core_2.12-3.3.1.45.jar:3.3.1.45]
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
~[?:?]
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
~[?:?]
            at java.lang.Thread.run(Thread.java:833) ~[?:?]
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the killing task on spilling won't report a false 
`SparkOutOfMemoryError`, so that the killed task status is KILLED instead of 
FAILED.
    
    ### How was this patch tested?
    
    Existing tests to ensure the change breaks nothing.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47000 from pan3793/SPARK-48642.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 00a96bb42ec69793f6251bf718d997e21e87c824)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java 
b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 83352611770f..08c080f5a5a1 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -18,6 +18,7 @@
 package org.apache.spark.memory;
 
 import javax.annotation.concurrent.GuardedBy;
+import java.io.InterruptedIOException;
 import java.io.IOException;
 import java.nio.channels.ClosedByInterruptException;
 import java.util.Arrays;
@@ -242,7 +243,7 @@ public class TaskMemoryManager {
         cList.remove(idx);
         return 0;
       }
-    } catch (ClosedByInterruptException e) {
+    } catch (ClosedByInterruptException | InterruptedIOException e) {
       // This called by user to kill a task (e.g: speculative task).
       logger.error("error while calling spill() on " + consumerToSpill, e);
       throw new RuntimeException(e.getMessage());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to