As noted in SPARK-34939 there is race when using broadcast for map
output status. Explanation from SPARK-34939
> After map statuses are broadcasted and the executors obtain
serialized broadcasted map statuses. If any fetch failure happens after,
Spark scheduler invalidates cached map statuses and destroy broadcasted
value of the map statuses. Then any executor trying to deserialize
serialized broadcasted map statuses and access broadcasted value,
IOException will be thrown. Currently we don't catch it in
MapOutputTrackerWorker and above exception will fail the application.
But if running with `spark.decommission.enabled=true` and
`spark.storage.decommission.shuffleBlocks.enabled=true` there is another
way to hit this race, when a node is decommissioning and the shuffle
blocks are migrated. After a block has been migrated an update will be
sent to the driver for each block and the map output caches will be
invalidated.
Here are a driver when we hit the race condition running with spark 3.2.0:
2022-01-28 03:20:12,409 INFO memory.MemoryStore: Block broadcast_27
stored as values in memory (estimated size 5.5 MiB, free 11.0 GiB)
2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output
for 192108 to BlockManagerId(760, ip-10-231-63-204.ec2.internal, 34707,
None)
2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output
for 179529 to BlockManagerId(743, ip-10-231-34-160.ec2.internal, 44225,
None)
2022-01-28 03:20:12,414 INFO spark.ShuffleStatus: Updating map output
for 187194 to BlockManagerId(761, ip-10-231-43-219.ec2.internal, 39943,
None)
2022-01-28 03:20:12,415 INFO spark.ShuffleStatus: Updating map output
for 190303 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965,
None)
2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output
for 192220 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965,
None)
2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output
for 182306 to BlockManagerId(688, ip-10-231-43-41.ec2.internal, 35967, None)
2022-01-28 03:20:12,417 INFO spark.ShuffleStatus: Updating map output
for 190387 to BlockManagerId(772, ip-10-231-55-173.ec2.internal, 35523,
None)
2022-01-28 03:20:12,417 INFO memory.MemoryStore: Block
broadcast_27_piece0 stored as bytes in memory (estimated size 4.0 MiB,
free 10.9 GiB)
2022-01-28 03:20:12,417 INFO storage.BlockManagerInfo: Added
broadcast_27_piece0 in memory on ip-10-231-63-1.ec2.internal:34761
(size: 4.0 MiB, free: 11.0 GiB)
2022-01-28 03:20:12,418 INFO memory.MemoryStore: Block
broadcast_27_piece1 stored as bytes in memory (estimated size 1520.4
KiB, free 10.9 GiB)
2022-01-28 03:20:12,418 INFO storage.BlockManagerInfo: Added
broadcast_27_piece1 in memory on ip-10-231-63-1.ec2.internal:34761
(size: 1520.4 KiB, free: 11.0 GiB)
2022-01-28 03:20:12,418 INFO spark.MapOutputTracker: Broadcast
outputstatuses size = 416, actual size = 5747443
2022-01-28 03:20:12,419 INFO spark.ShuffleStatus: Updating map output
for 153389 to BlockManagerId(154, ip-10-231-42-104.ec2.internal, 44717,
None)
2022-01-28 03:20:12,419 INFO broadcast.TorrentBroadcast: Destroying
Broadcast(27) (from updateMapOutput at BlockManagerMasterEndpoint.scala:594)
2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Added
rdd_65_20310 on disk on ip-10-231-32-25.ec2.internal:40657 (size: 77.6 MiB)
2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Removed
broadcast_27_piece0 on ip-10-231-63-1.ec2.internal:34761 in memory
(size: 4.0 MiB, free: 11.0 GiB)
While the Broadcast is being constructed we have updates coming in and
the broadcast is destroyed almost immediately. On this particular job we
ended up hitting the race condition a lot of times and it caused ~18
task failures and stage retries within 20 seconds causing us to hit our
stage retry limit and the job to fail.
As far I understand this was the expected behavior for handling this
case after SPARK-34939. But it seems like when combined with
decommissioning hitting the race is a bit too common.
Anyone else running it something similar?
I willing to help up to develop a fix, but might need some guidance of
how this case could be handled better.
---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org