[ 
https://issues.apache.org/jira/browse/SPARK-38101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-38101:
----------------------------------
    Affects Version/s: 3.3.0
                           (was: 3.3)

> MetadataFetchFailedException due to decommission block migrations
> -----------------------------------------------------------------
>
>                 Key: SPARK-38101
>                 URL: https://issues.apache.org/jira/browse/SPARK-38101
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.1.2, 3.1.3, 3.2.1, 3.3.0, 3.2.2
>            Reporter: Emil Ejbyfeldt
>            Priority: Major
>
> As noted in SPARK-34939 there is race when using broadcast for map output 
> status. Explanation from SPARK-34939
> > After map statuses are broadcasted and the executors obtain serialized 
> > broadcasted map statuses. If any fetch failure happens after, Spark 
> > scheduler invalidates cached map statuses and destroy broadcasted value of 
> > the map statuses. Then any executor trying to deserialize serialized 
> > broadcasted map statuses and access broadcasted value, IOException will be 
> > thrown. Currently we don't catch it in MapOutputTrackerWorker and above 
> > exception will fail the application.
> But if running with `spark.decommission.enabled=true` and 
> `spark.storage.decommission.shuffleBlocks.enabled=true` there is another way 
> to hit this race, when a node is decommissioning and the shuffle blocks are 
> migrated. After a block has been migrated an update will be sent to the 
> driver for each block and the map output caches will be invalidated.
> Here are a driver when we hit the race condition running with spark 3.2.0:
> {code:java}
> 2022-01-28 03:20:12,409 INFO memory.MemoryStore: Block broadcast_27 stored as 
> values in memory (estimated size 5.5 MiB, free 11.0 GiB)
> 2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output for 
> 192108 to BlockManagerId(760, ip-10-231-63-204.ec2.internal, 34707, None)
> 2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output for 
> 179529 to BlockManagerId(743, ip-10-231-34-160.ec2.internal, 44225, None)
> 2022-01-28 03:20:12,414 INFO spark.ShuffleStatus: Updating map output for 
> 187194 to BlockManagerId(761, ip-10-231-43-219.ec2.internal, 39943, None)
> 2022-01-28 03:20:12,415 INFO spark.ShuffleStatus: Updating map output for 
> 190303 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965, None)
> 2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output for 
> 192220 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965, None)
> 2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output for 
> 182306 to BlockManagerId(688, ip-10-231-43-41.ec2.internal, 35967, None)
> 2022-01-28 03:20:12,417 INFO spark.ShuffleStatus: Updating map output for 
> 190387 to BlockManagerId(772, ip-10-231-55-173.ec2.internal, 35523, None)
> 2022-01-28 03:20:12,417 INFO memory.MemoryStore: Block broadcast_27_piece0 
> stored as bytes in memory (estimated size 4.0 MiB, free 10.9 GiB)
> 2022-01-28 03:20:12,417 INFO storage.BlockManagerInfo: Added 
> broadcast_27_piece0 in memory on ip-10-231-63-1.ec2.internal:34761 (size: 4.0 
> MiB, free: 11.0 GiB)
> 2022-01-28 03:20:12,418 INFO memory.MemoryStore: Block broadcast_27_piece1 
> stored as bytes in memory (estimated size 1520.4 KiB, free 10.9 GiB)
> 2022-01-28 03:20:12,418 INFO storage.BlockManagerInfo: Added 
> broadcast_27_piece1 in memory on ip-10-231-63-1.ec2.internal:34761 (size: 
> 1520.4 KiB, free: 11.0 GiB)
> 2022-01-28 03:20:12,418 INFO spark.MapOutputTracker: Broadcast outputstatuses 
> size = 416, actual size = 5747443
> 2022-01-28 03:20:12,419 INFO spark.ShuffleStatus: Updating map output for 
> 153389 to BlockManagerId(154, ip-10-231-42-104.ec2.internal, 44717, None)
> 2022-01-28 03:20:12,419 INFO broadcast.TorrentBroadcast: Destroying 
> Broadcast(27) (from updateMapOutput at BlockManagerMasterEndpoint.scala:594)
> 2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Added rdd_65_20310 on 
> disk on ip-10-231-32-25.ec2.internal:40657 (size: 77.6 MiB)
> 2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Removed 
> broadcast_27_piece0 on ip-10-231-63-1.ec2.internal:34761 in memory (size: 4.0 
> MiB, free: 11.0 GiB)
> {code}
> While the Broadcast is being constructed we have updates coming in and the 
> broadcast is destroyed almost immediately. On this particular job we ended up 
> hitting the race condition a lot of times and it caused ~18 task failures and 
> stage retries within 20 seconds causing us to hit our stage retry limit and 
> the job to fail.
> As far I understand this was the expected behavior for handling this case 
> after SPARK-34939. But it seems like when combined with decommissioning 
> hitting the race is a bit too common.
> We have observed this behavior running 3.2.0 and 3.2.1, but I think other 
> current versions are all so affected.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to