[ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826880#comment-16826880
 ] 

Jarek Potiuk commented on AIRFLOW-4401:
---------------------------------------

I think we might have to dig a bit deeper simply :) The solution we had only 
helped to see reliably if the queue has been emptied already. But the reverse 
is not handled properly now as a side effect :(. `If not empty()` is now 
unreliable it seems and we implicitly relied on it at least in one place 
([https://github.com/apache/airflow/blob/master/airflow/jobs.py#L490]) by 
calling if not empty() immediately followed by get_nowait().

>From a quick look it seems that the problem we see in AIRFLOW-4416 is that 
>SynchronizedQueue.empty() might return False (because counters/sizes are 
>synchronized already) but then get_nowait() will still raise an Empty 
>exception. This is in case the actual object to get has not yet been 
>synchronized between processes/read from Pipe. The original empty() 
>implementation from Queue was reliable in this case (i.e. when it returned 
>False, you could be sure there is something to get immediately because it was 
>checking if there is something waiting in the pipe already). And we seemed to 
>implicitly rely on that. That's why we never had this Empty exception raised 
>in this place I believe.

One solution (maybe) could be to use get() rather than get_nowait() in this 
case, another is to call super.empty() in the SynchronizedQueue and 
double-check, but we need to decide what to do if the two methods disagree. But 
i have the feeling it's just a band-aid and we need to at least check how the 
other queues are used.

Knowing that empty() is inherently unreliable - maybe you are right [~ash] that 
a good solution might be to get some other synchronisation mechanism to check 
if we are actually "done". We should definitely take a look at that.

Maybe we can join effort with [~BasPH] and cross-check our findings during 
weekend as we did last time. 

 

> multiprocessing.Queue.empty() is unreliable
> -------------------------------------------
>
>                 Key: AIRFLOW-4401
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Jarek Potiuk
>            Priority: Major
>             Fix For: 1.10.4
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
> LocalExecutor tests, I took a deeper dive into the problem and what I found 
> raised the remaining hair on top of my head. 
> We had a number of flaky tests in the local executor that resulted in 
> result_queue not being empty where it should have been emptied a moment 
> before. More details and discussion can be found in 
> [https://github.com/apache/airflow/pull/5159] 
> The problem turned out to be ... unreliability of multiprocessing.Queue 
> empty() implementation. It turned out that multiprocessing.Queue.empty() 
> implementation is not fully synchronized and it might return True even if 
> put() operation has been already completed in another process. What's more - 
> empty() might return True even if qsize() of the queue returns > 0 (!) 
> It's a bit mind-boggling but it is "as intended' as documented in 
> [https://bugs.python.org/issue23582]  (resolved as "not a bug" !!!!) 
> A few people have stumbled upon this problem. For example 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  and [https://github.com/keras-team/autokeras/issues/368] 
> Also we seemed to experienced that in Airflow before. In jobs.py years ago 
> (31.07.2016) - we can see the comment below (but we used 
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582]  using qsize() 
> was working on Linux but is not really acceptable because qsize() does not 
> work on MacOS (throws NotImplementedError).
> The working solution is to implement a reliable queue (SynchronizedQueue) 
> based on 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  (butwith a twist that __init__ of class deriving from Queue has to be 
> changed for python 3.4+ as described in 
> [https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].
> Luckily we are now Python3.5+
> We should replace all usages of multiprocessing.Queue where empty() is used 
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
> in similar way in the future.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to