[ 
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830149#comment-16830149
 ] 

ASF subversion and git services commented on AIRFLOW-4401:
----------------------------------------------------------

Commit af7902d33dd45bae0d9884e5f1889f5dbebd9340 in airflow's branch 
refs/heads/v1-10-test from Jarek Potiuk
[ https://gitbox.apache.org/repos/asf?p=airflow.git;h=af7902d ]

[AIRFLOW-4401] Use managers for Queue synchronization (#5200)

It is a known problem https://bugs.python.org/issue23582 that
multiprocessing.Queue empty() method is not reliable - sometimes it might
return True even if another process already put something in the queue.

This resulted in some of the tasks not picked up when sync() methods
were called (in AirflowKubernetesScheduler, LocalExecutor,
DagFileProcessor). This was less of a problem if the method was called in sync()
- as the remaining jobs/files could be processed in next pass but it was a 
problem
in tests and when graceful shutdown was executed (some tasks could be still
unprocessed while the shutdown occured).

We switched to Managers() managed queues to handle that - the queue in this case
is run in a separate subprocess and each process using it uses a proxy to access
this shared queue. Additionally all Queues() returned by managers are Joinable
Queues so we should run task_done() after all processing and we can now perform
join() in termination/end code to wait until all tasks are actually processed,
not only retrieved from the queue. That increases gracefulness of shutdown.

All the cases impacted follow the same general pattern now:

while True:
   try:
       res = queue.get_nowait()
       try:
          .... do some processing
       finally:
           queue.task_done()
   except Empty:
       break

In all these cases overhead for inter-processing locking is negligible
comparing to the action executed (Parsing DAG, executing job)
so it appears it should be safe for concurrency as well.

(cherry picked from commit 6952b196863aeb50632656bb4004129159ff573e)


> multiprocessing.Queue.empty() is unreliable
> -------------------------------------------
>
>                 Key: AIRFLOW-4401
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
>             Project: Apache Airflow
>          Issue Type: Bug
>            Reporter: Jarek Potiuk
>            Priority: Major
>             Fix For: 1.10.4
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of 
> LocalExecutor tests (documented for example in AIRFLOW-4382), I took a deeper 
> dive into the problem and what I found raised the remaining hair on top of my 
> head. 
> We had a number of flaky tests in the local executor that resulted in 
> result_queue not being empty where it should have been emptied a moment 
> before. More details and discussion can be found in 
> [https://github.com/apache/airflow/pull/5159] 
> The problem turned out to be  unreliability of multiprocessing.Queue empty() 
> implementation. It turned out that multiprocessing.Queue.empty() 
> implementation is not fully synchronized and it might return True even if 
> put() operation has been already completed in another process. What's more - 
> empty() might return True even if qsize() of the queue returns > 0 (!) 
> It's a bit mind-boggling but it is "as intended' as documented in 
> [https://bugs.python.org/issue23582]  (resolved as "not a bug" !!!!) and it 
> is described in 
> [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] 
> when you go details of how data is synchronized between processes.
> A few people have stumbled upon this problem. For example 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  and [https://github.com/keras-team/autokeras/issues/368] 
> Also we seemed to experienced that in Airflow before. In jobs.py years ago 
> (31.07.2016) - we can see the comment below (but we used 
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582]  using qsize() 
> was working on Linux but is not really acceptable because qsize() does not 
> work on MacOS (throws NotImplementedError).
>  
> *Proposed solution 1) Synchronized Queue*
> [https://github.com/apache/airflow/pull/5199]
> Implement a more reliable queue (SynchronizedQueue) based on 
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
>  (but we have to addjust initialisation to match 2.7 and 3.5+ syntax (since 
> we want to backport to stable v1.10 release).
> We should replace all usages of multiprocessing.Queue where empty() is used 
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue 
> in similar way in the future.
> Pros:
>  * rather straightforward replacement of queue -> SynchronizedQueue
>  * no extra processes needed - queues continue to be distributed without 
> central manager
>  * no need to cleanup the processes
> Cons:
>  * potential synchronization delays (likely negligible)
>  * we are adding our own SynchronizedQueue with slightly altered behaviour - 
> more code to manage
>  * the SynchronizedQueue implementation is still not fully reliable - you can 
> have cases where empty() returns False but get_no_wait() raises Empty 
> exception This means that everywhere we depend on non empty() we have to use 
> potentially blocking get() to retrieve data
>  * Requires (but simple) backporting to python 2 for v1-10 branch
> *Proposed solution 2): Use managed queues*
> [https://github.com/apache/airflow/pull/5200]
> Seems that this unreliable behaviour of Queue is only happening if the Queue 
> is instantiated directly and the small delays between processes are gone when 
> Shared Manager is used. In such case Queue is really a proxy to a central 
> Queue object started in a separate process - thus synchronisation is 
> implemented fully via this single central queue: 
> [https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues] . 
> Using Managed queues should solve the problem.
> Observation from tests confirms that this is the case and the tests are not 
> flaky any more when managed queues are used.
> Pros:
>  * Only initialisation of queues needs to be changed - no need to extend 
> Queue implementation
>  * Pythonic way - managers are part of standard library and we can assume 
> they are reliable and tested
>  * Such managed queue is fully reliable - empty() and get_no_wait() are 
> perfectly in sync.
>  * Works the same for python 2/python 3
> Cons:
>  * potential synchronization delays (likely negligible)
>  * since we have a separate process started for each manager, cleanup is 
> necessary and it is quite delicate, because shutting down the manager 
> prevents from accessing the queue (Broken Pipe errors). Therefore sequence of 
> cleanup is important - to first process everything and clean-up later. This 
> might have some undesirable side effects when shutting down Schedulers/Workers



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to