[
https://issues.apache.org/jira/browse/FLINK-5638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Till Rohrmann updated FLINK-5638:
---------------------------------
Fix Version/s: (was: 1.2.1)
1.2.0
> Deadlock when closing two chained async I/O operators
> -----------------------------------------------------
>
> Key: FLINK-5638
> URL: https://issues.apache.org/jira/browse/FLINK-5638
> Project: Flink
> Issue Type: Bug
> Components: Local Runtime
> Affects Versions: 1.2.0, 1.3.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.3.0
>
>
> The {{AsyncWaitOperator}} can deadlock in a special cases when closing two
> chained {{AsyncWaitOperator}} while there is still one element between these
> two operators in flight.
> The deadlock scenario is the following: Given two chained
> {{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element
> completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element
> from the queue and output it to {{a2}}. This poll and output operation
> happens under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the
> {{e1}} thread will directly call {{a2's}} {{processElement}} function. In
> this function, we try to add the new element to the {{StreamElementQueue}}.
> Now assume that this queue is full. Then the operation will release the
> checkpoint lock and wait until it is notified again.
> In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we
> have consumed all input. The close operation also happens under the
> checkpoint lock. First the close method waits until all elements from the
> {{StreamElementQueue}} have been processed (== empty). This happens by
> waiting on the checkpoint lock. Next the {{e1}} is interrupted and we join on
> {{e1}}. When interrupting {{e1}}, it currently waits on the checkpoint lock.
> Since the closing operation does not release the checkpoint lock, {{e1}}
> cannot regain the synchronization lock and voila we have a deadlock.
> There are two problems which cause the problem:
> 1. We assume that the {{AsyncWaitOperator}} has processed all its elements if
> the queue is empty. This is usually the case if the output operation is
> atomic. However in the chained case it can happen that the emitter thread has
> to wait to insert the element into the queue of the next
> {{AsyncWaitOperator}}. Under these circumstances, we release the checkpoint
> lock and, thus, the output operation is no longer atomic. We can solve this
> problem by polling the last queue element after we have outputted it instead
> of before.
> 2. We interrupt the emitter thread while holding the checkpoint lock and not
> freeing it again. Under these circumstances, the interrupt signal is
> meaningless because the emitter thread also needs control over the checkpoint
> lock. We should solve the problem by waiting on the checkpoint lock and
> periodically checking whether the thread has already stopped or not.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)