[ 
https://issues.apache.org/jira/browse/SPARK-36782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Thiele updated SPARK-36782:
----------------------------------
    Description: 
I can observe a deadlock on the driver that can be triggered rather reliably in 
a job with a larger amount of tasks - upon using
{code:java}
spark.decommission.enabled: true
spark.storage.decommission.rddBlocks.enabled: true
spark.storage.decommission.shuffleBlocks.enabled: true
spark.storage.decommission.enabled: true{code}
 

It origins in the {{dispatcher-BlockManagerMaster}} making a call to 
{{updateBlockInfo}} when shuffles are migrated. This is not performed by a 
thread from the pool but instead by the {{dispatcher-BlockManagerMaster}} 
itself. I suppose this was done under the assumption that this would be very 
fast. However if the block that is updated is a shuffle index block it calls
{code:java}
mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId){code}
for which it waits to acquire a write lock as part of the {{MapOutputTracker}}.

If the timing is bad then one of the {{map-output-dispatchers}} are holding 
this lock as part of e.g. {{serializedMapStatus}}. In this function 
{{MapOutputTracker.serializeOutputStatuses}} is called and as part of that we do
{code:java}
if (arrSize >= minBroadcastSize) {
 // Use broadcast instead.
 // Important arr(0) is the tag == DIRECT, ignore that while deserializing !
 // arr is a nested Array so that it can handle over 2GB serialized data
 val arr = chunkedByteBuf.getChunks().map(_.array())
 val bcast = broadcastManager.newBroadcast(arr, isLocal){code}
which makes an RPC call to {{dispatcher-BlockManagerMaster}}. That one however 
is unable to answer as it is blocked while waiting on the aforementioned lock. 
Hence the deadlock. The ingredients of this deadlock are therefore: sufficient 
size of the array to go the broadcast-path, as well as timing of incoming 
{{updateBlockInfo}} call as happens regularly during decommissioning. 
Potentially earlier versions than 3.1.0 are affected but I could not 
sufficiently conclude that.

I have a stacktrace of all driver threads showing the deadlock: 
[^spark_stacktrace_deadlock.txt]

A coworker of mine wrote a patch that replicates the issue as a test case as 
well: [^0001-Add-test-showing-that-decommission-might-deadlock.patch]

  was:
I can observe a deadlock on the driver that can be triggered rather reliably in 
a job with a larger amount of tasks - upon using
{code:java}
spark.decommission.enabled: true
spark.storage.decommission.rddBlocks.enabled: true
spark.storage.decommission.shuffleBlocks.enabled: true
spark.storage.decommission.enabled: true{code}
 

It origins in the {{dispatcher-BlockManagerMaster}} making a call to 
{{updateBlockInfo}} when shuffles are migrated. This is not performed by a 
thread from the pool but instead by the {{dispatcher-BlockManagerMaster}} 
itself. I suppose this was done under the assumption that this would be very 
fast. However if the block that is updated is a shuffle index block it calls
{code:java}
mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId){code}
for which it waits to acquire a write lock as part of the {{MapOutputTracker}}.

If the timing is bad then one of the {{map-output-dispatchers}} are holding 
this lock as part of e.g. {{serializedMapStatus}}. In this function 
{{MapOutputTracker.serializeOutputStatuses}} is called and as part of that we do
{code:java}
if (arrSize >= minBroadcastSize) {
 // Use broadcast instead.
 // Important arr(0) is the tag == DIRECT, ignore that while deserializing !
 // arr is a nested Array so that it can handle over 2GB serialized data
 val arr = chunkedByteBuf.getChunks().map(_.array())
 val bcast = broadcastManager.newBroadcast(arr, isLocal){code}
which makes an RPC call to {{dispatcher-BlockManagerMaster}}. That one however 
is unable to answer as it is blocked while waiting on the aforementioned lock. 
Hence the deadlock. The ingredients of this deadlock are therefore: sufficient 
size of the array to go the broadcast-path, as well as timing of incoming 
{{updateBlockInfo}} call as happens regularly during decommissioning. 
Potentially earlier versions than 3.1.0 are affected but I could not 
sufficiently conclude that.

I have a stacktrace of all driver threads showing the deadlock. However I can't 
seem to attach it to the initial bug report.

A coworker of mine wrote a patch that replicates the issue as a test case as 
well. Once able to technically attach it I will do so too.


> Deadlock between map-output-dispatcher and dispatcher-BlockManagerMaster upon 
> migrating shuffle blocks
> ------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-36782
>                 URL: https://issues.apache.org/jira/browse/SPARK-36782
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager
>    Affects Versions: 3.1.0, 3.1.1, 3.1.2, 3.2.0, 3.1.3, 3.3.0, 3.2.1, 3.3
>            Reporter: Fabian Thiele
>            Priority: Major
>         Attachments: 
> 0001-Add-test-showing-that-decommission-might-deadlock.patch, 
> spark_stacktrace_deadlock.txt
>
>
> I can observe a deadlock on the driver that can be triggered rather reliably 
> in a job with a larger amount of tasks - upon using
> {code:java}
> spark.decommission.enabled: true
> spark.storage.decommission.rddBlocks.enabled: true
> spark.storage.decommission.shuffleBlocks.enabled: true
> spark.storage.decommission.enabled: true{code}
>  
> It origins in the {{dispatcher-BlockManagerMaster}} making a call to 
> {{updateBlockInfo}} when shuffles are migrated. This is not performed by a 
> thread from the pool but instead by the {{dispatcher-BlockManagerMaster}} 
> itself. I suppose this was done under the assumption that this would be very 
> fast. However if the block that is updated is a shuffle index block it calls
> {code:java}
> mapOutputTracker.updateMapOutput(shuffleId, mapId, blockManagerId){code}
> for which it waits to acquire a write lock as part of the 
> {{MapOutputTracker}}.
> If the timing is bad then one of the {{map-output-dispatchers}} are holding 
> this lock as part of e.g. {{serializedMapStatus}}. In this function 
> {{MapOutputTracker.serializeOutputStatuses}} is called and as part of that we 
> do
> {code:java}
> if (arrSize >= minBroadcastSize) {
>  // Use broadcast instead.
>  // Important arr(0) is the tag == DIRECT, ignore that while deserializing !
>  // arr is a nested Array so that it can handle over 2GB serialized data
>  val arr = chunkedByteBuf.getChunks().map(_.array())
>  val bcast = broadcastManager.newBroadcast(arr, isLocal){code}
> which makes an RPC call to {{dispatcher-BlockManagerMaster}}. That one 
> however is unable to answer as it is blocked while waiting on the 
> aforementioned lock. Hence the deadlock. The ingredients of this deadlock are 
> therefore: sufficient size of the array to go the broadcast-path, as well as 
> timing of incoming {{updateBlockInfo}} call as happens regularly during 
> decommissioning. Potentially earlier versions than 3.1.0 are affected but I 
> could not sufficiently conclude that.
> I have a stacktrace of all driver threads showing the deadlock: 
> [^spark_stacktrace_deadlock.txt]
> A coworker of mine wrote a patch that replicates the issue as a test case as 
> well: [^0001-Add-test-showing-that-decommission-might-deadlock.patch]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to