[ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16828944#comment-16828944
 ] 

Jarek Potiuk commented on AIRFLOW-4401:
---------------------------------------

[~ash] and [~BasPH] : Both solutions candidates are now green in Travis and I 
think both are solving the problem. So it's a good time to review and think 
which one to choose.
 * [https://github.com/apache/airflow/pull/5199]
 * [https://github.com/apache/airflow/pull/5200]

> multiprocessing.Queue.empty() is unreliable
> -------------------------------------------
>
>                 Key: AIRFLOW-4401
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Jarek Potiuk
>            Priority: Major
>             Fix For: 1.10.4
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
> LocalExecutor tests (documented for example in AIRFLOW-4382), I took a deeper 
> dive into the problem and what I found raised the remaining hair on top of my 
> head. 
> We had a number of flaky tests in the local executor that resulted in 
> result_queue not being empty where it should have been emptied a moment 
> before. More details and discussion can be found in 
> [https://github.com/apache/airflow/pull/5159] 
> The problem turned out to be  unreliability of multiprocessing.Queue empty() 
> implementation. It turned out that multiprocessing.Queue.empty() 
> implementation is not fully synchronized and it might return True even if 
> put() operation has been already completed in another process. What's more - 
> empty() might return True even if qsize() of the queue returns > 0 (!) 
> It's a bit mind-boggling but it is "as intended' as documented in 
> [https://bugs.python.org/issue23582]  (resolved as "not a bug" !!!!) and it 
> is described in 
> [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] 
> when you go details of how data is synchronized between processes.
> A few people have stumbled upon this problem. For example 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  and [https://github.com/keras-team/autokeras/issues/368] 
> Also we seemed to experienced that in Airflow before. In jobs.py years ago 
> (31.07.2016) - we can see the comment below (but we used 
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582]  using qsize() 
> was working on Linux but is not really acceptable because qsize() does not 
> work on MacOS (throws NotImplementedError).
>  
> *Proposed solution 1) Synchronized Queue*
> [https://github.com/apache/airflow/pull/5199]
> Implement a more reliable queue (SynchronizedQueue) based on 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  (but we have to addjust initialisation to match 2.7 and 3.5+ syntax (since 
> we want to backport to stable v1.10 release).
> We should replace all usages of multiprocessing.Queue where empty() is used 
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
> in similar way in the future.
> Pros:
>  * rather straightforward replacement of queue -> SynchronizedQueue
>  * no extra processes needed - queues continue to be distributed without 
> central manager
>  * no need to cleanup the processes
> Cons:
>  * potential synchronization delays (likely negligible)
>  * we are adding our own SynchronizedQueue with slightly altered behaviour - 
> more code to manage
>  * the SynchronizedQueue implementation is still not fully reliable - you can 
> have cases where empty() returns False but get_no_wait() raises Empty 
> exception This means that everywhere we depend on non empty() we have to use 
> potentially blocking get() to retrieve data
>  * Requires (but simple) backporting to python 2 for v1-10 branch
> *Proposed solution 2): Use managed queues*
> [https://github.com/apache/airflow/pull/5200]
> Seems that this unreliable behaviour of Queue is only happening if the Queue 
> is instantiated directly and the small delays between processes are gone when 
> Shared Manager is used. In such case Queue is really a proxy to a central 
> Queue object started in a separate process - thus synchronisation is 
> implemented fully via this single central queue: 
> [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] . 
> Using Managed queues should solve the problem.
> Observation from tests confirms that this is the case and the tests are not 
> flaky any more when managed queues are used.
> Pros:
>  * Only initialisation of queues needs to be changed - no need to extend 
> Queue implementation
>  * Pythonic way - managers are part of standard library and we can assume 
> they are reliable and tested
>  * Such managed queue is fully reliable - empty() and get_no_wait() are 
> perfectly in sync.
>  * Works the same for python 2/python 3
> Cons:
>  * potential synchronization delays (likely negligible)
>  * since we have a separate process started for each manager, cleanup is 
> necessary and it is quite delicate, because shutting down the manager 
> prevents from accessing the queue (Broken Pipe errors). Therefore sequence of 
> cleanup is important - to first process everything and clean-up later. This 
> might have some undesirable side effects when shutting down Schedulers/Workers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to