This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f4ce0ab8cee0 [SPARK-51512][CORE] Filter out null MapStatus when cleaning up shuffle data with ExternalShuffleService f4ce0ab8cee0 is described below commit f4ce0ab8cee043d0b1a8d935fb13ce617d2de873 Author: Kun Wan <wanku...@163.com> AuthorDate: Wed Apr 9 14:42:13 2025 +0800 [SPARK-51512][CORE] Filter out null MapStatus when cleaning up shuffle data with ExternalShuffleService ### What changes were proposed in this pull request? When the application crashes unexpectedly, the registered map statuses may contain null values, therefore we should skip these null values when cleaning up shuffle data with ExternalShuffleService. ### Why are the changes needed? Small bug fix ```log 25/03/14 15:41:35 ERROR ContextCleaner: Error cleaning shuffle 4 org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85) at org.apache.spark.storage.BlockManagerMaster.removeShuffle(BlockManagerMaster.scala:204) at org.apache.spark.shuffle.sort.io.LocalDiskShuffleDriverComponents.removeShuffle(LocalDiskShuffleDriverComponents.java:47) at org.apache.spark.ContextCleaner.doCleanupShuffle(ContextCleaner.scala:241) at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3(ContextCleaner.scala:203) at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$3$adapted(ContextCleaner.scala:196) at scala.Option.foreach(Option.scala:437) at org.apache.spark.ContextCleaner.$anonfun$keepCleaning$1(ContextCleaner.scala:196) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1383) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:190) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:80) Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.spark.scheduler.MapStatus.location()" because "mapStatus" is null at org.apache.spark.storage.BlockManagerMasterEndpoint.$anonfun$removeShuffle$3(BlockManagerMasterEndpoint.scala:423) at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1323) at org.apache.spark.storage.BlockManagerMasterEndpoint.$anonfun$removeShuffle$2(BlockManagerMasterEndpoint.scala:421) at org.apache.spark.storage.BlockManagerMasterEndpoint.$anonfun$removeShuffle$2$adapted(BlockManagerMasterEndpoint.scala:420) at org.apache.spark.ShuffleStatus.$anonfun$withMapStatuses$1(MapOutputTracker.scala:435) at org.apache.spark.ShuffleStatus.withReadLock(MapOutputTracker.scala:72) at org.apache.spark.ShuffleStatus.withMapStatuses(MapOutputTracker.scala:435) at org.apache.spark.storage.BlockManagerMasterEndpoint.$anonfun$removeShuffle$1(BlockManagerMasterEndpoint.scala:420) at org.apache.spark.storage.BlockManagerMasterEndpoint.$anonfun$removeShuffle$1$adapted(BlockManagerMasterEndpoint.scala:419) at scala.Option.foreach(Option.scala:437) at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$removeShuffle(BlockManagerMasterEndpoint.scala:419) at org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:201) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:104) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:216) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:76) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:42) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) at java.base/java.lang.Thread.run(Thread.java:1583) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Local test ### Was this patch authored or co-authored using generative AI tooling? No Closes #50277 from wankunde/SPARK-51512. Authored-by: Kun Wan <wanku...@163.com> Signed-off-by: Kent Yao <y...@apache.org> --- .../scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index fa3aee0103a9..e33161e25f08 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -418,7 +418,7 @@ class BlockManagerMasterEndpoint( if (externalShuffleServiceRemoveShuffleEnabled) { mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus => shuffleStatus.withMapStatuses { mapStatuses => - mapStatuses.foreach { mapStatus => + mapStatuses.filter(_ != null).foreach { mapStatus => // Check if the executor has been deallocated if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) { val blocksToDel = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org