Thank you for sharing, Emil. > I willing to help up to develop a fix, but might need some guidance of > how this case could be handled better.
Could you file an official Apache JIRA for your finding and propose a PR for that too with the test case? We can continue our discussion on your PR. Dongjoon. On Wed, Feb 2, 2022 at 3:59 AM Emil Ejbyfeldt <eejbyfe...@liveintent.com.invalid> wrote: > As noted in SPARK-34939 there is race when using broadcast for map > output status. Explanation from SPARK-34939 > > > After map statuses are broadcasted and the executors obtain > serialized broadcasted map statuses. If any fetch failure happens after, > Spark scheduler invalidates cached map statuses and destroy broadcasted > value of the map statuses. Then any executor trying to deserialize > serialized broadcasted map statuses and access broadcasted value, > IOException will be thrown. Currently we don't catch it in > MapOutputTrackerWorker and above exception will fail the application. > > But if running with `spark.decommission.enabled=true` and > `spark.storage.decommission.shuffleBlocks.enabled=true` there is another > way to hit this race, when a node is decommissioning and the shuffle > blocks are migrated. After a block has been migrated an update will be > sent to the driver for each block and the map output caches will be > invalidated. > > Here are a driver when we hit the race condition running with spark 3.2.0: > > 2022-01-28 03:20:12,409 INFO memory.MemoryStore: Block broadcast_27 > stored as values in memory (estimated size 5.5 MiB, free 11.0 GiB) > 2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output > for 192108 to BlockManagerId(760, ip-10-231-63-204.ec2.internal, 34707, > None) > 2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output > for 179529 to BlockManagerId(743, ip-10-231-34-160.ec2.internal, 44225, > None) > 2022-01-28 03:20:12,414 INFO spark.ShuffleStatus: Updating map output > for 187194 to BlockManagerId(761, ip-10-231-43-219.ec2.internal, 39943, > None) > 2022-01-28 03:20:12,415 INFO spark.ShuffleStatus: Updating map output > for 190303 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965, > None) > 2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output > for 192220 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965, > None) > 2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output > for 182306 to BlockManagerId(688, ip-10-231-43-41.ec2.internal, 35967, > None) > 2022-01-28 03:20:12,417 INFO spark.ShuffleStatus: Updating map output > for 190387 to BlockManagerId(772, ip-10-231-55-173.ec2.internal, 35523, > None) > 2022-01-28 03:20:12,417 INFO memory.MemoryStore: Block > broadcast_27_piece0 stored as bytes in memory (estimated size 4.0 MiB, > free 10.9 GiB) > 2022-01-28 03:20:12,417 INFO storage.BlockManagerInfo: Added > broadcast_27_piece0 in memory on ip-10-231-63-1.ec2.internal:34761 > (size: 4.0 MiB, free: 11.0 GiB) > 2022-01-28 03:20:12,418 INFO memory.MemoryStore: Block > broadcast_27_piece1 stored as bytes in memory (estimated size 1520.4 > KiB, free 10.9 GiB) > 2022-01-28 03:20:12,418 INFO storage.BlockManagerInfo: Added > broadcast_27_piece1 in memory on ip-10-231-63-1.ec2.internal:34761 > (size: 1520.4 KiB, free: 11.0 GiB) > 2022-01-28 03:20:12,418 INFO spark.MapOutputTracker: Broadcast > outputstatuses size = 416, actual size = 5747443 > 2022-01-28 03:20:12,419 INFO spark.ShuffleStatus: Updating map output > for 153389 to BlockManagerId(154, ip-10-231-42-104.ec2.internal, 44717, > None) > 2022-01-28 03:20:12,419 INFO broadcast.TorrentBroadcast: Destroying > Broadcast(27) (from updateMapOutput at > BlockManagerMasterEndpoint.scala:594) > 2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Added > rdd_65_20310 on disk on ip-10-231-32-25.ec2.internal:40657 (size: 77.6 MiB) > 2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Removed > broadcast_27_piece0 on ip-10-231-63-1.ec2.internal:34761 in memory > (size: 4.0 MiB, free: 11.0 GiB) > > While the Broadcast is being constructed we have updates coming in and > the broadcast is destroyed almost immediately. On this particular job we > ended up hitting the race condition a lot of times and it caused ~18 > task failures and stage retries within 20 seconds causing us to hit our > stage retry limit and the job to fail. > > As far I understand this was the expected behavior for handling this > case after SPARK-34939. But it seems like when combined with > decommissioning hitting the race is a bit too common. > > Anyone else running it something similar? > > I willing to help up to develop a fix, but might need some guidance of > how this case could be handled better. > > --------------------------------------------------------------------- > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org > >