This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e5036f18aa85 [SPARK-51512][CORE] Filter out null MapStatus when 
cleaning up shuffle data with ExternalShuffleService
e5036f18aa85 is described below

commit e5036f18aa85134a291c04ef519d41af54e1df3f
Author: Kun Wan <wanku...@163.com>
AuthorDate: Wed Apr 9 14:42:13 2025 +0800

    [SPARK-51512][CORE] Filter out null MapStatus when cleaning up shuffle data 
with ExternalShuffleService
    
    ### What changes were proposed in this pull request?
    
    When the application crashes unexpectedly, the registered map statuses may 
contain null values, therefore we should skip these null values when cleaning 
up shuffle data with ExternalShuffleService.
    
    ### Why are the changes needed?
    
    Small bug fix
    
    ```log
    25/03/14 15:41:35 ERROR ContextCleaner: Error cleaning shuffle 4
    org.apache.spark.SparkException: Exception thrown in awaitResult:
            at 
org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
            at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
            at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
            at 
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
            at 
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
            at 
org.apache.spark.storage.BlockManagerMaster.removeShuffle(BlockManagerMaster.scala:204)
            at 
org.apache.spark.shuffle.sort.io.LocalDiskShuffleDriverComponents.removeShuffle(LocalDiskShuffleDriverComponents.java:47)
            at 
org.apache.spark.ContextCleaner.doCleanupShuffle(ContextCleaner.scala:241)
            at 
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:203)
            at 
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:196)
            at scala.Option.foreach(Option.scala:437)
            at 
org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:196)
            at 
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1383)
            at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:190)
            at 
org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:80)
    Caused by: java.lang.NullPointerException: Cannot invoke 
"org.apache.spark.scheduler.MapStatus.location()" because "mapStatus" is null
            at 
org.apache.spark.storage.BlockManagerMasterEndpoint.$anonfun$removeShuffle$3(BlockManagerMasterEndpoint.scala:423)
            at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1323)
            at 
org.apache.spark.storage.BlockManagerMasterEndpoint.$anonfun$removeShuffle$2(BlockManagerMasterEndpoint.scala:421)
            at 
org.apache.spark.storage.BlockManagerMasterEndpoint.$anonfun$removeShuffle$2$adapted(BlockManagerMasterEndpoint.scala:420)
            at 
org.apache.spark.ShuffleStatus.$anonfun$withMapStatuses$1(MapOutputTracker.scala:435)
            at 
org.apache.spark.ShuffleStatus.withReadLock(MapOutputTracker.scala:72)
            at 
org.apache.spark.ShuffleStatus.withMapStatuses(MapOutputTracker.scala:435)
            at 
org.apache.spark.storage.BlockManagerMasterEndpoint.$anonfun$removeShuffle$1(BlockManagerMasterEndpoint.scala:420)
            at 
org.apache.spark.storage.BlockManagerMasterEndpoint.$anonfun$removeShuffle$1$adapted(BlockManagerMasterEndpoint.scala:419)
            at scala.Option.foreach(Option.scala:437)
            at 
org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$removeShuffle(BlockManagerMasterEndpoint.scala:419)
            at 
org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:201)
            at 
org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:104)
            at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:216)
            at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
            at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:76)
            at 
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:42)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
            at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
            at java.base/java.lang.Thread.run(Thread.java:1583)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Local test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50277 from wankunde/SPARK-51512.
    
    Authored-by: Kun Wan <wanku...@163.com>
    Signed-off-by: Kent Yao <y...@apache.org>
    (cherry picked from commit f4ce0ab8cee043d0b1a8d935fb13ce617d2de873)
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index fa3aee0103a9..e33161e25f08 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -418,7 +418,7 @@ class BlockManagerMasterEndpoint(
     if (externalShuffleServiceRemoveShuffleEnabled) {
       mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus 
=>
         shuffleStatus.withMapStatuses { mapStatuses =>
-          mapStatuses.foreach { mapStatus =>
+          mapStatuses.filter(_ != null).foreach { mapStatus =>
             // Check if the executor has been deallocated
             if 
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
               val blocksToDel =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to