Till Rohrmann created FLINK-5638:
------------------------------------

             Summary: Deadlock when closing two chained async I/O operators
                 Key: FLINK-5638
                 URL: https://issues.apache.org/jira/browse/FLINK-5638
             Project: Flink
          Issue Type: Bug
          Components: Local Runtime
    Affects Versions: 1.2.0, 1.3.0
            Reporter: Till Rohrmann
            Assignee: Till Rohrmann
             Fix For: 1.3.0, 1.2.1


The {{AsyncWaitOperator}} can deadlock in a special cases when closing two 
chained {{AsyncWaitOperator}} while there is still one element between these 
two operators in flight.

The deadlock scenario is the following: Given two chained 
{{AsyncWaitOperators}} {{a1}} and {{a2}}. {{a1}} has its last element 
completed. This notifies {{a1's}} {{Emitter}}, {{e1}}, to remove the element 
from the queue and output it to {{a2}}. This poll and output operation happens 
under the checkpoint lock. Since {{a1}} and {{a2}} are chained, the {{e1}} 
thread will directly call {{a2's}} {{processElement}} function. In this 
function, we try to add the new element to the {{StreamElementQueue}}. Now 
assume that this queue is full. Then the operation will release the checkpoint 
lock and wait until it is notified again.

In the meantime, {{a1.close()}} is called by the {{StreamTask}}, because we 
have consumed all input. The close operation also happens under the checkpoint 
lock. First the close method waits until all elements from the 
{{StreamElementQueue}} have been processed (== empty). This happens by waiting 
on the checkpoint lock. Next the {{e1}} is interrupted and we join on {{e1}}. 
When interrupting {{e1}}, it currently waits on the checkpoint lock. Since the 
closing operation does not release the checkpoint lock, {{e1}} cannot regain 
the synchronization lock and voila we have a deadlock.

There are two problems which cause the problem:

1. We assume that the {{AsyncWaitOperator}} has processed all its elements if 
the queue is empty. This is usually the case if the output operation is atomic. 
However in the chained case it can happen that the emitter thread has to wait 
to insert the element into the queue of the next {{AsyncWaitOperator}}. Under 
these circumstances, we release the checkpoint lock and, thus, the output 
operation is no longer atomic. We can solve this problem by polling the last 
queue element after we have outputted it instead of before.

2. We interrupt the emitter thread while holding the checkpoint lock and not 
freeing it again. Under these circumstances, the interrupt signal is 
meaningless because the emitter thread also needs control over the checkpoint 
lock. We should solve the problem by waiting on the checkpoint lock and 
periodically checking whether the thread has already stopped or not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to