[ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830209#comment-16830209
 ] 

ASF GitHub Bot commented on AIRFLOW-4401:
-----------------------------------------

potiuk commented on pull request #5208: [AIRFLOW-4401] Fixup to: Use managers 
for Queue synchronization
URL: https://github.com/apache/airflow/pull/5208
 
 
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references 
them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
     - https://issues.apache.org/jira/browse/AIRFLOW-4401
     - In case you are fixing a typo in the documentation you can prepend your 
commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.
     - In case you are proposing a fundamental code change, you need to create 
an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)).
     - In case you are adding a dependency, check if the license complies with 
the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:
   
   Temporary fix for intermittent habgs until 
https://issues.apache.org/jira/browse/AIRFLOW-4440 is fixed.
   
   ### Tests
   
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [ ] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
     1. Subject is separated from body by a blank line
     1. Subject is limited to 50 characters (not including Jira issue reference)
     1. Subject does not end with a period
     1. Subject uses the imperative mood ("add", not "adding")
     1. Body wraps at 72 characters
     1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
     - All the public functions and the classes in the PR contain docstrings 
that explain what it does
     - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   
   ### Code Quality
   
   - [x] Passes `flake8`
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> multiprocessing.Queue.empty() is unreliable
> -------------------------------------------
>
>                 Key: AIRFLOW-4401
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Jarek Potiuk
>            Priority: Major
>             Fix For: 1.10.4, 2.0.0
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
> LocalExecutor tests (documented for example in AIRFLOW-4382), I took a deeper 
> dive into the problem and what I found raised the remaining hair on top of my 
> head. 
> We had a number of flaky tests in the local executor that resulted in 
> result_queue not being empty where it should have been emptied a moment 
> before. More details and discussion can be found in 
> [https://github.com/apache/airflow/pull/5159] 
> The problem turned out to be  unreliability of multiprocessing.Queue empty() 
> implementation. It turned out that multiprocessing.Queue.empty() 
> implementation is not fully synchronized and it might return True even if 
> put() operation has been already completed in another process. What's more - 
> empty() might return True even if qsize() of the queue returns > 0 (!) 
> It's a bit mind-boggling but it is "as intended' as documented in 
> [https://bugs.python.org/issue23582]  (resolved as "not a bug" !!!!) and it 
> is described in 
> [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] 
> when you go details of how data is synchronized between processes.
> A few people have stumbled upon this problem. For example 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  and [https://github.com/keras-team/autokeras/issues/368] 
> Also we seemed to experienced that in Airflow before. In jobs.py years ago 
> (31.07.2016) - we can see the comment below (but we used 
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582]  using qsize() 
> was working on Linux but is not really acceptable because qsize() does not 
> work on MacOS (throws NotImplementedError).
>  
> *Proposed solution 1) Synchronized Queue*
> [https://github.com/apache/airflow/pull/5199]
> Implement a more reliable queue (SynchronizedQueue) based on 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  (but we have to addjust initialisation to match 2.7 and 3.5+ syntax (since 
> we want to backport to stable v1.10 release).
> We should replace all usages of multiprocessing.Queue where empty() is used 
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
> in similar way in the future.
> Pros:
>  * rather straightforward replacement of queue -> SynchronizedQueue
>  * no extra processes needed - queues continue to be distributed without 
> central manager
>  * no need to cleanup the processes
> Cons:
>  * potential synchronization delays (likely negligible)
>  * we are adding our own SynchronizedQueue with slightly altered behaviour - 
> more code to manage
>  * the SynchronizedQueue implementation is still not fully reliable - you can 
> have cases where empty() returns False but get_no_wait() raises Empty 
> exception This means that everywhere we depend on non empty() we have to use 
> potentially blocking get() to retrieve data
>  * Requires (but simple) backporting to python 2 for v1-10 branch
> *Proposed solution 2): Use managed queues*
> [https://github.com/apache/airflow/pull/5200]
> Seems that this unreliable behaviour of Queue is only happening if the Queue 
> is instantiated directly and the small delays between processes are gone when 
> Shared Manager is used. In such case Queue is really a proxy to a central 
> Queue object started in a separate process - thus synchronisation is 
> implemented fully via this single central queue: 
> [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] . 
> Using Managed queues should solve the problem.
> Observation from tests confirms that this is the case and the tests are not 
> flaky any more when managed queues are used.
> Pros:
>  * Only initialisation of queues needs to be changed - no need to extend 
> Queue implementation
>  * Pythonic way - managers are part of standard library and we can assume 
> they are reliable and tested
>  * Such managed queue is fully reliable - empty() and get_no_wait() are 
> perfectly in sync.
>  * Works the same for python 2/python 3
> Cons:
>  * potential synchronization delays (likely negligible)
>  * since we have a separate process started for each manager, cleanup is 
> necessary and it is quite delicate, because shutting down the manager 
> prevents from accessing the queue (Broken Pipe errors). Therefore sequence of 
> cleanup is important - to first process everything and clean-up later. This 
> might have some undesirable side effects when shutting down Schedulers/Workers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to