[
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533075#comment-16533075
]
ASF GitHub Bot commented on FLINK-9676:
---------------------------------------
GitHub user NicoK opened a pull request:
https://github.com/apache/flink/pull/6257
[FLINK-9676][network] clarify contracts of
BufferListener#notifyBufferAvailable() and fix a deadlock
## What is the purpose of the change
When recycling exclusive buffers of a `RemoteInputChannel` and recycling
(other/floating) buffers to the buffer pool concurrently while the
`RemoteInputChannel` is registered as a listener to the buffer pool and adding
the exclusive buffer triggers a floating buffer to be recycled back to the same
buffer pool, a deadlock would occur holding locks on
`LocalBufferPool#availableMemorySegments` and `RemoteInputChannel#bufferQueue`
but acquiring them in reverse order.
One such instance would be (thanks @zhijiangW for finding this):
```
Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle
floating buffers
-> lock(LocalBufferPool#availableMemorySegments) ->
RemoteInputChannel2#notifyBufferAvailable
-> try to lock(RemoteInputChannel2#bufferQueue)
```
```
Task thread -> RemoteInputChannel2#recycle
-> lock(RemoteInputChannel2#bufferQueue) -> bufferQueue#addExclusiveBuffer
-> floatingBuffer#recycleBuffer
-> try to lock(LocalBufferPool#availableMemorySegments)
```
This PR is a second approach to #6254 and solves the deadlock on the
`LocalBufferPool` side as the other solution turned out to be even more complex
than what's already in the PR (I'll update that PR in a second).
@pnowojski and @tillrohrmann can you also have a quick look so that this
can get into 1.5.1?
## Brief change log
- clarify the contract of `BufferListener#notifyBufferAvailable()` (see in
the code)
- make sure that `LocalBufferPool#recycle()` does not break this contract,
i.e. call the listener's callback outside the lock around
`LocalBufferPool#availableMemorySegments`
## Verifying this change
This change added tests and can be verified as follows:
- added `RemoteInputChannelTest#testConcurrentRecycleAndRelease2` which
catches this deadlock quite quickly
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no** (per
buffer, but we're only moving recycling out of the synchronized block so if
there's any effect, it should be positive)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no**
- If yes, how is the feature documented? **JavaDocs**
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/NicoK/flink flink-9676-lbp
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6257.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6257
----
commit d69ff968f8647efa13adcc6f338e411675b36d68
Author: Nico Kruber <nico@...>
Date: 2018-07-04T15:45:18Z
[FLINK-9676][network] clarify contracts of
BufferListener#notifyBufferAvailable() and fix a deadlock
When recycling exclusive buffers of a RemoteInputChannel and recycling
(other/floating) buffers to the buffer pool concurrently while the
RemoteInputChannel is registered as a listener to the buffer pool and
adding the
exclusive buffer triggers a floating buffer to be recycled back to the same
buffer pool, a deadlock would occur holding locks on
LocalBufferPool#availableMemorySegments and RemoteInputChannel#bufferQueue
but
acquiring them in reverse order.
One such instance would be:
Task canceler thread -> RemoteInputChannel1#releaseAllResources -> recycle
floating buffers
-> lock(LocalBufferPool#availableMemorySegments) ->
RemoteInputChannel2#notifyBufferAvailable
-> try to lock(RemoteInputChannel2#bufferQueue)
Task thread -> RemoteInputChannel2#recycle
-> lock(RemoteInputChannel2#bufferQueue) ->
bufferQueue#addExclusiveBuffer -> floatingBuffer#recycleBuffer
-> try to lock(LocalBufferPool#availableMemorySegments)
Therefore, we decouple the listener callback from lock around
LocalBufferPool#availableMemorySegments and implicitly enforce that
RemoteInputChannel2#bufferQueue takes precedence over this lock, i.e. must
be acquired first and should never be taken after having locked on
LocalBufferPool#availableMemorySegments.
----
> Deadlock during canceling task and recycling exclusive buffer
> -------------------------------------------------------------
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
> Issue Type: Bug
> Components: Network
> Affects Versions: 1.5.0
> Reporter: zhijiang
> Assignee: Nico Kruber
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) ->
> IC2#notifyBufferAvailable}} > {color:#d04437}try to
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue)
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} ->
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside
> the {{synchronized(availableMemorySegments) in }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle
> can cover this case but the deadlock probability is very low, so this UT is
> not stable.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)