[
https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16828017#comment-16828017
]
ASF GitHub Bot commented on AIRFLOW-4401:
-----------------------------------------
potiuk commented on pull request #5199: [AIRFLOW-4401] SynchronizedQueue used
where empty() is used.
URL: https://github.com/apache/airflow/pull/5199
Make sure you have checked _all_ steps below.
### Jira
- [x] My PR addresses the following [Airflow
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references
them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
- https://issues.apache.org/jira/browse/AIRFLOW-4401
- In case you are fixing a typo in the documentation you can prepend your
commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.
- In case you are proposing a fundamental code change, you need to create
an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)).
- In case you are adding a dependency, check if the license complies with
the [ASF 3rd Party License
Policy](https://www.apache.org/legal/resolved.html#category-x).
### Description
- [x] Here are some details about my PR, including screenshots of any UI
changes:
It is a known problem https://bugs.python.org/issue23582 that
multiprocessing.Queue empty() method is not reliable - sometimes it might
return True even if another process already put something in the queue.
This resulted in some of the tasks not picked up when sync() methods
were called (in AirflowKubernetesScheduler, LocalExecutor,
DagFileProcessor). This was less of a problem if the method was called in
sync()
- as the remaining jobs/files could be processed in next pass but it was a
problem
in tests and when graceful shutdown was executed (some tasks could be still
unprocessed while the shutdown occured).
All the cases impacted follow the same pattern now:
while not queue.empty():
res = queue.get()
....
This loop runs always in single (main) process so it is safe to run it this
way -
there is no risk that some other process will retrieve the data from the
queue in
between empty() and get().
Note that unlike in the standard multiprocessing.Queue, you cannot rely
on data being immediately available after empty() is False. You should be
prepared that subsequent get_nowait() raises Empty, or (better) use get()
to retrieve the data.
In all these cases overhead for inter-processing locking is negligible
comparing to the action executed (Parsing DAG, executing job)
so it appears it should be safe to merge the change.
### Tests
- [x] My PR adds the following unit tests __OR__ does not need testing for
this extremely good reason:
No need. Lots of tests for that already (flaky ones).
### Commits
- [x] My commits all reference Jira issues in their subject lines, and I
have squashed multiple commits if they address the same issue. In addition, my
commits follow the guidelines from "[How to write a good git commit
message](http://chris.beams.io/posts/git-commit/)":
1. Subject is separated from body by a blank line
1. Subject is limited to 50 characters (not including Jira issue reference)
1. Subject does not end with a period
1. Subject uses the imperative mood ("add", not "adding")
1. Body wraps at 72 characters
1. Body explains "what" and "why", not "how"
### Documentation
- [x] In case of new functionality, my PR adds documentation that describes
how to use it.
- All the public functions and the classes in the PR contain docstrings
that explain what it does
- If you implement backwards incompatible changes, please leave a note in
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so
we can assign it to a appropriate release
### Code Quality
- [x] Passes `flake8`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> multiprocessing.Queue.empty() is unreliable
> -------------------------------------------
>
> Key: AIRFLOW-4401
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4401
> Project: Apache Airflow
> Issue Type: Bug
> Reporter: Jarek Potiuk
> Priority: Major
> Fix For: 1.10.4
>
>
> After discussing with [~ash] and [~BasPH] potential reasons for flakiness of
> LocalExecutor tests, I took a deeper dive into the problem and what I found
> raised the remaining hair on top of my head.
> We had a number of flaky tests in the local executor that resulted in
> result_queue not being empty where it should have been emptied a moment
> before. More details and discussion can be found in
> [https://github.com/apache/airflow/pull/5159]
> The problem turned out to be ... unreliability of multiprocessing.Queue
> empty() implementation. It turned out that multiprocessing.Queue.empty()
> implementation is not fully synchronized and it might return True even if
> put() operation has been already completed in another process. What's more -
> empty() might return True even if qsize() of the queue returns > 0 (!)
> It's a bit mind-boggling but it is "as intended' as documented in
> [https://bugs.python.org/issue23582] (resolved as "not a bug" !!!!)
> A few people have stumbled upon this problem. For example
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
> and [https://github.com/keras-team/autokeras/issues/368]
> Also we seemed to experienced that in Airflow before. In jobs.py years ago
> (31.07.2016) - we can see the comment below (but we used
> multiprocessing.queue empty() nevertheless):
> {code:java}
> # Not using multiprocessing.Queue() since it's no longer a separate
> # process and due to some unusual behavior. (empty() incorrectly
> # returns true?){code}
> The solution available in [https://bugs.python.org/issue23582] using qsize()
> was working on Linux but is not really acceptable because qsize() does not
> work on MacOS (throws NotImplementedError).
> The working solution is to implement a reliable queue (SynchronizedQueue)
> based on
> [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f]
> (butwith a twist that __init__ of class deriving from Queue has to be
> changed for python 3.4+ as described in
> [https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue].
> Luckily we are now Python3.5+
> We should replace all usages of multiprocessing.Queue where empty() is used
> with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue
> in similar way in the future.
>
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)