[
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jarek Potiuk resolved AIRFLOW-4401.
-----------------------------------
Resolution: Fixed
Fix Version/s: 2.0.0
> multiprocessing.Queue.empty() is unreliable
> -------------------------------------------
>
> Key: AIRFLOW-4401
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
> Project: Apache Airflow
> Issue Type: Bug
> Reporter: Jarek Potiuk
> Priority: Major
> Fix For: 2.0.0
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of
> LocalExecutor tests, I took a deeper dive into the problem and what I found
> raised the remaining hair on top of my head.
> We had a number of flaky tests in the local executor that resulted in
> result_queue not being empty where it should have been emptied a moment
> before. More details and discussion can be found in
> [https://github.com/apache/airflow/pull/5159]
> The problem turned out to be ... unreliability of multiprocessing.Queue
> empty() implementation. It turned out that multiprocessing.Queue.empty()
> implementation is not fully synchronized and it might return True even if
> put() operation has been already completed in another process. What's more -
> empty() might return True even if qsize() of the queue returns > 0 (!)
> It's a bit mind-boggling but it is "as intended' as documented in
> [https://bugs.python.org/issue23582] (resolved as "not a bug" !!!!)
> A few people have stumbled upon this problem. For example
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
> and [https://github.com/keras-team/autokeras/issues/368]
> Also we seemed to experienced that in Airflow before. In jobs.py years ago
> (31.07.2016) - we can see the comment below (but we used
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582] using qsize()
> was working on Linux but is not really acceptable because qsize() does not
> work on MacOS (throws NotImplementedError).
> The working solution is to implement a reliable queue (SynchronizedQueue)
> based on
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
> (butwith a twist that __init__ of class deriving from Queue has to be
> changed for python 3.4+ as described in
> [https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].
> Luckily we are now Python3.5+
> We should replace all usages of multiprocessing.Queue where empty() is used
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue
> in similar way in the future.
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)