[ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830133#comment-16830133
 ] 

ASF GitHub Bot commented on AIRFLOW-4401:
-----------------------------------------

potiuk commented on pull request #5200: [AIRFLOW-4401] Use managers for Queue 
synchronization
URL: https://github.com/apache/airflow/pull/5200
 
 
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> multiprocessing.Queue.empty() is unreliable
> -------------------------------------------
>
>                 Key: AIRFLOW-4401
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Jarek Potiuk
>            Priority: Major
>             Fix For: 1.10.4
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
> LocalExecutor tests (documented for example in AIRFLOW-4382), I took a deeper 
> dive into the problem and what I found raised the remaining hair on top of my 
> head. 
> We had a number of flaky tests in the local executor that resulted in 
> result_queue not being empty where it should have been emptied a moment 
> before. More details and discussion can be found in 
> [https://github.com/apache/airflow/pull/5159] 
> The problem turned out to be  unreliability of multiprocessing.Queue empty() 
> implementation. It turned out that multiprocessing.Queue.empty() 
> implementation is not fully synchronized and it might return True even if 
> put() operation has been already completed in another process. What's more - 
> empty() might return True even if qsize() of the queue returns > 0 (!) 
> It's a bit mind-boggling but it is "as intended' as documented in 
> [https://bugs.python.org/issue23582]  (resolved as "not a bug" !!!!) and it 
> is described in 
> [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] 
> when you go details of how data is synchronized between processes.
> A few people have stumbled upon this problem. For example 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  and [https://github.com/keras-team/autokeras/issues/368] 
> Also we seemed to experienced that in Airflow before. In jobs.py years ago 
> (31.07.2016) - we can see the comment below (but we used 
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582]  using qsize() 
> was working on Linux but is not really acceptable because qsize() does not 
> work on MacOS (throws NotImplementedError).
>  
> *Proposed solution 1) Synchronized Queue*
> [https://github.com/apache/airflow/pull/5199]
> Implement a more reliable queue (SynchronizedQueue) based on 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  (but we have to addjust initialisation to match 2.7 and 3.5+ syntax (since 
> we want to backport to stable v1.10 release).
> We should replace all usages of multiprocessing.Queue where empty() is used 
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
> in similar way in the future.
> Pros:
>  * rather straightforward replacement of queue -> SynchronizedQueue
>  * no extra processes needed - queues continue to be distributed without 
> central manager
>  * no need to cleanup the processes
> Cons:
>  * potential synchronization delays (likely negligible)
>  * we are adding our own SynchronizedQueue with slightly altered behaviour - 
> more code to manage
>  * the SynchronizedQueue implementation is still not fully reliable - you can 
> have cases where empty() returns False but get_no_wait() raises Empty 
> exception This means that everywhere we depend on non empty() we have to use 
> potentially blocking get() to retrieve data
>  * Requires (but simple) backporting to python 2 for v1-10 branch
> *Proposed solution 2): Use managed queues*
> [https://github.com/apache/airflow/pull/5200]
> Seems that this unreliable behaviour of Queue is only happening if the Queue 
> is instantiated directly and the small delays between processes are gone when 
> Shared Manager is used. In such case Queue is really a proxy to a central 
> Queue object started in a separate process - thus synchronisation is 
> implemented fully via this single central queue: 
> [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] . 
> Using Managed queues should solve the problem.
> Observation from tests confirms that this is the case and the tests are not 
> flaky any more when managed queues are used.
> Pros:
>  * Only initialisation of queues needs to be changed - no need to extend 
> Queue implementation
>  * Pythonic way - managers are part of standard library and we can assume 
> they are reliable and tested
>  * Such managed queue is fully reliable - empty() and get_no_wait() are 
> perfectly in sync.
>  * Works the same for python 2/python 3
> Cons:
>  * potential synchronization delays (likely negligible)
>  * since we have a separate process started for each manager, cleanup is 
> necessary and it is quite delicate, because shutting down the manager 
> prevents from accessing the queue (Broken Pipe errors). Therefore sequence of 
> cleanup is important - to first process everything and clean-up later. This 
> might have some undesirable side effects when shutting down Schedulers/Workers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to