Thank you for sharing, Emil.

> I willing to help up to develop a fix, but might need some guidance of
> how this case could be handled better.

Could you file an official Apache JIRA for your finding and
propose a PR for that too with the test case? We can continue
our discussion on your PR.

Dongjoon.



On Wed, Feb 2, 2022 at 3:59 AM Emil Ejbyfeldt
<eejbyfe...@liveintent.com.invalid> wrote:

> As noted in SPARK-34939 there is race when using broadcast for map
> output status. Explanation from SPARK-34939
>
>  > After map statuses are broadcasted and the executors obtain
> serialized broadcasted map statuses. If any fetch failure happens after,
> Spark scheduler invalidates cached map statuses and destroy broadcasted
> value of the map statuses. Then any executor trying to deserialize
> serialized broadcasted map statuses and access broadcasted value,
> IOException will be thrown. Currently we don't catch it in
> MapOutputTrackerWorker and above exception will fail the application.
>
> But if running with `spark.decommission.enabled=true` and
> `spark.storage.decommission.shuffleBlocks.enabled=true` there is another
> way to hit this race, when a node is decommissioning and the shuffle
> blocks are migrated. After a block has been migrated an update will be
> sent to the driver for each block and the map output caches will be
> invalidated.
>
> Here are a driver when we hit the race condition running with spark 3.2.0:
>
> 2022-01-28 03:20:12,409 INFO memory.MemoryStore: Block broadcast_27
> stored as values in memory (estimated size 5.5 MiB, free 11.0 GiB)
> 2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output
> for 192108 to BlockManagerId(760, ip-10-231-63-204.ec2.internal, 34707,
> None)
> 2022-01-28 03:20:12,410 INFO spark.ShuffleStatus: Updating map output
> for 179529 to BlockManagerId(743, ip-10-231-34-160.ec2.internal, 44225,
> None)
> 2022-01-28 03:20:12,414 INFO spark.ShuffleStatus: Updating map output
> for 187194 to BlockManagerId(761, ip-10-231-43-219.ec2.internal, 39943,
> None)
> 2022-01-28 03:20:12,415 INFO spark.ShuffleStatus: Updating map output
> for 190303 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965,
> None)
> 2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output
> for 192220 to BlockManagerId(270, ip-10-231-33-206.ec2.internal, 38965,
> None)
> 2022-01-28 03:20:12,416 INFO spark.ShuffleStatus: Updating map output
> for 182306 to BlockManagerId(688, ip-10-231-43-41.ec2.internal, 35967,
> None)
> 2022-01-28 03:20:12,417 INFO spark.ShuffleStatus: Updating map output
> for 190387 to BlockManagerId(772, ip-10-231-55-173.ec2.internal, 35523,
> None)
> 2022-01-28 03:20:12,417 INFO memory.MemoryStore: Block
> broadcast_27_piece0 stored as bytes in memory (estimated size 4.0 MiB,
> free 10.9 GiB)
> 2022-01-28 03:20:12,417 INFO storage.BlockManagerInfo: Added
> broadcast_27_piece0 in memory on ip-10-231-63-1.ec2.internal:34761
> (size: 4.0 MiB, free: 11.0 GiB)
> 2022-01-28 03:20:12,418 INFO memory.MemoryStore: Block
> broadcast_27_piece1 stored as bytes in memory (estimated size 1520.4
> KiB, free 10.9 GiB)
> 2022-01-28 03:20:12,418 INFO storage.BlockManagerInfo: Added
> broadcast_27_piece1 in memory on ip-10-231-63-1.ec2.internal:34761
> (size: 1520.4 KiB, free: 11.0 GiB)
> 2022-01-28 03:20:12,418 INFO spark.MapOutputTracker: Broadcast
> outputstatuses size = 416, actual size = 5747443
> 2022-01-28 03:20:12,419 INFO spark.ShuffleStatus: Updating map output
> for 153389 to BlockManagerId(154, ip-10-231-42-104.ec2.internal, 44717,
> None)
> 2022-01-28 03:20:12,419 INFO broadcast.TorrentBroadcast: Destroying
> Broadcast(27) (from updateMapOutput at
> BlockManagerMasterEndpoint.scala:594)
> 2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Added
> rdd_65_20310 on disk on ip-10-231-32-25.ec2.internal:40657 (size: 77.6 MiB)
> 2022-01-28 03:20:12,427 INFO storage.BlockManagerInfo: Removed
> broadcast_27_piece0 on ip-10-231-63-1.ec2.internal:34761 in memory
> (size: 4.0 MiB, free: 11.0 GiB)
>
> While the Broadcast is being constructed we have updates coming in and
> the broadcast is destroyed almost immediately. On this particular job we
> ended up hitting the race condition a lot of times and it caused ~18
> task failures and stage retries within 20 seconds causing us to hit our
> stage retry limit and the job to fail.
>
> As far I understand this was the expected behavior for handling this
> case after SPARK-34939. But it seems like when combined with
> decommissioning hitting the race is a bit too common.
>
> Anyone else running it something similar?
>
> I willing to help up to develop a fix, but might need some guidance of
> how this case could be handled better.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

Reply via email to