[
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jarek Potiuk updated AIRFLOW-4401:
----------------------------------
Description:
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of
LocalExecutor tests, I took a deeper dive into the problem and what I found
raised the remaining hair on top of my head.
We had a number of flaky tests in the local executor that resulted in
result_queue not being empty where it should have been emptied a moment before.
More details and discussion can be found in
[https://github.com/apache/airflow/pull/5159]
The problem turned out to be ... unreliability of multiprocessing.Queue empty()
implementation. It turned out that multiprocessing.Queue.empty() implementation
is not fully synchronized and it might return True even if put() operation has
been already completed in another process. What's more - empty() might return
True even if qsize() of the queue returns > 0 (!)
It's a bit mind-boggling but it is "as intended' as documented in
[https://bugs.python.org/issue23582] (resolved as "not a bug" !!!!)
A few people have stumbled upon this problem. For example
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
and [https://github.com/keras-team/autokeras/issues/368]
Also we seemed to experienced that in Airflow before. In jobs.py years ago
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582] using qsize()
was working on Linux but is not really acceptable because qsize() does not work
on MacOS (throws NotImplementedError).
The solution 1)
Implement a reliable queue (SynchronizedQueue) based on
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
(butwith a twist that __init__ of class deriving from Queue has to be changed
for python 3.4+ as described in
[https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].
Luckily we are now Python3.5+
We should replace all usages of multiprocessing.Queue where empty() is used
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue
in similar way in the future.
The solution 2)
Seems that this unreliable behaviour of Queue is only happening if the Queue is
instantiated directly and the small delays between processes are gone when
Shared Manager is used (because then Queue is a proxy to a central Queue
started in a separate process - thus synchronisation is implemented in this
single queue:
[https://docs.python.org/2/library/multiprocessing.html#pipes-and-queues] .
Switching to Managed queue should solve the problem as well.
was:
After discussing with [~ash] and [~BasPH] potential reasons for flakiness of
LocalExecutor tests, I took a deeper dive into the problem and what I found
raised the remaining hair on top of my head.
We had a number of flaky tests in the local executor that resulted in
result_queue not being empty where it should have been emptied a moment before.
More details and discussion can be found in
[https://github.com/apache/airflow/pull/5159]
The problem turned out to be ... unreliability of multiprocessing.Queue empty()
implementation. It turned out that multiprocessing.Queue.empty() implementation
is not fully synchronized and it might return True even if put() operation has
been already completed in another process. What's more - empty() might return
True even if qsize() of the queue returns > 0 (!)
It's a bit mind-boggling but it is "as intended' as documented in
[https://bugs.python.org/issue23582] (resolved as "not a bug" !!!!)
A few people have stumbled upon this problem. For example
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
and [https://github.com/keras-team/autokeras/issues/368]
Also we seemed to experienced that in Airflow before. In jobs.py years ago
(31.07.2016) - we can see the comment below (but we used multiprocessing.queue
empty() nevertheless):
{code:java}
# Not using multiprocessing.Queue() since it's no longer a separate
# process and due to some unusual behavior. (empty() incorrectly
# returns true?){code}
The solution available in [https://bugs.python.org/issue23582] using qsize()
was working on Linux but is not really acceptable because qsize() does not work
on MacOS (throws NotImplementedError).
The working solution is to implement a reliable queue (SynchronizedQueue) based
on
[https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
(butwith a twist that __init__ of class deriving from Queue has to be changed
for python 3.4+ as described in
[https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].
Luckily we are now Python3.5+
We should replace all usages of multiprocessing.Queue where empty() is used
with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue
in similar way in the future.
> multiprocessing.Queue.empty() is unreliable
> -------------------------------------------
>
> Key: AIRFLOW-4401
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
> Project: Apache Airflow
> Issue Type: Bug
> Reporter: Jarek Potiuk
> Priority: Major
> Fix For: 1.10.4
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of
> LocalExecutor tests, I took a deeper dive into the problem and what I found
> raised the remaining hair on top of my head.
> We had a number of flaky tests in the local executor that resulted in
> result_queue not being empty where it should have been emptied a moment
> before. More details and discussion can be found in
> [https://github.com/apache/airflow/pull/5159]
> The problem turned out to be ... unreliability of multiprocessing.Queue
> empty() implementation. It turned out that multiprocessing.Queue.empty()
> implementation is not fully synchronized and it might return True even if
> put() operation has been already completed in another process. What's more -
> empty() might return True even if qsize() of the queue returns > 0 (!)
> It's a bit mind-boggling but it is "as intended' as documented in
> [https://bugs.python.org/issue23582] (resolved as "not a bug" !!!!)
> A few people have stumbled upon this problem. For example
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
> and [https://github.com/keras-team/autokeras/issues/368]
> Also we seemed to experienced that in Airflow before. In jobs.py years ago
> (31.07.2016) - we can see the comment below (but we used
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582] using qsize()
> was working on Linux but is not really acceptable because qsize() does not
> work on MacOS (throws NotImplementedError).
> The solution 1)
> Implement a reliable queue (SynchronizedQueue) based on
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
> (butwith a twist that __init__ of class deriving from Queue has to be
> changed for python 3.4+ as described in
> [https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].
> Luckily we are now Python3.5+
> We should replace all usages of multiprocessing.Queue where empty() is used
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue
> in similar way in the future.
> The solution 2)
> Seems that this unreliable behaviour of Queue is only happening if the Queue
> is instantiated directly and the small delays between processes are gone when
> Shared Manager is used (because then Queue is a proxy to a central Queue
> started in a separate process - thus synchronisation is implemented in this
> single queue:
> [https://docs.python.org/2/library/multiprocessing.html#pipes-and-queues] .
> Switching to Managed queue should solve the problem as well.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)