[ https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16828944#comment-16828944 ]
Jarek Potiuk commented on AIRFLOW-4401: --------------------------------------- [~ash] and [~BasPH] : Both solutions candidates are now green in Travis and I think both are solving the problem. So it's a good time to review and think which one to choose. * [https://github.com/apache/airflow/pull/5199] * [https://github.com/apache/airflow/pull/5200] > multiprocessing.Queue.empty() is unreliable > ------------------------------------------- > > Key: AIRFLOW-4401 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4401 > Project: Apache Airflow > Issue Type: Bug > Reporter: Jarek Potiuk > Priority: Major > Fix For: 1.10.4 > > > After discussing with [~ash] and [~BasPH] potential reasons for flakiness of > LocalExecutor tests (documented for example in AIRFLOW-4382), I took a deeper > dive into the problem and what I found raised the remaining hair on top of my > head. > We had a number of flaky tests in the local executor that resulted in > result_queue not being empty where it should have been emptied a moment > before. More details and discussion can be found in > [https://github.com/apache/airflow/pull/5159] > The problem turned out to be unreliability of multiprocessing.Queue empty() > implementation. It turned out that multiprocessing.Queue.empty() > implementation is not fully synchronized and it might return True even if > put() operation has been already completed in another process. What's more - > empty() might return True even if qsize() of the queue returns > 0 (!) > It's a bit mind-boggling but it is "as intended' as documented in > [https://bugs.python.org/issue23582] (resolved as "not a bug" !!!!) and it > is described in > [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] > when you go details of how data is synchronized between processes. > A few people have stumbled upon this problem. For example > [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f] > and [https://github.com/keras-team/autokeras/issues/368] > Also we seemed to experienced that in Airflow before. In jobs.py years ago > (31.07.2016) - we can see the comment below (but we used > multiprocessing.queue empty() nevertheless): > {code:java} > # Not using multiprocessing.Queue() since it's no longer a separate > # process and due to some unusual behavior. (empty() incorrectly > # returns true?){code} > The solution available in [https://bugs.python.org/issue23582] using qsize() > was working on Linux but is not really acceptable because qsize() does not > work on MacOS (throws NotImplementedError). > > *Proposed solution 1) Synchronized Queue* > [https://github.com/apache/airflow/pull/5199] > Implement a more reliable queue (SynchronizedQueue) based on > [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f] > (but we have to addjust initialisation to match 2.7 and 3.5+ syntax (since > we want to backport to stable v1.10 release). > We should replace all usages of multiprocessing.Queue where empty() is used > with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue > in similar way in the future. > Pros: > * rather straightforward replacement of queue -> SynchronizedQueue > * no extra processes needed - queues continue to be distributed without > central manager > * no need to cleanup the processes > Cons: > * potential synchronization delays (likely negligible) > * we are adding our own SynchronizedQueue with slightly altered behaviour - > more code to manage > * the SynchronizedQueue implementation is still not fully reliable - you can > have cases where empty() returns False but get_no_wait() raises Empty > exception This means that everywhere we depend on non empty() we have to use > potentially blocking get() to retrieve data > * Requires (but simple) backporting to python 2 for v1-10 branch > *Proposed solution 2): Use managed queues* > [https://github.com/apache/airflow/pull/5200] > Seems that this unreliable behaviour of Queue is only happening if the Queue > is instantiated directly and the small delays between processes are gone when > Shared Manager is used. In such case Queue is really a proxy to a central > Queue object started in a separate process - thus synchronisation is > implemented fully via this single central queue: > [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] . > Using Managed queues should solve the problem. > Observation from tests confirms that this is the case and the tests are not > flaky any more when managed queues are used. > Pros: > * Only initialisation of queues needs to be changed - no need to extend > Queue implementation > * Pythonic way - managers are part of standard library and we can assume > they are reliable and tested > * Such managed queue is fully reliable - empty() and get_no_wait() are > perfectly in sync. > * Works the same for python 2/python 3 > Cons: > * potential synchronization delays (likely negligible) > * since we have a separate process started for each manager, cleanup is > necessary and it is quite delicate, because shutting down the manager > prevents from accessing the queue (Broken Pipe errors). Therefore sequence of > cleanup is important - to first process everything and clean-up later. This > might have some undesirable side effects when shutting down Schedulers/Workers -- This message was sent by Atlassian JIRA (v7.6.3#76005)