potiuk opened a new issue #19934:
URL: https://github.com/apache/airflow/issues/19934
### Apache Airflow version
main (development)
### Operating System
Unix with 'spawn' method, MacOS, Windows (spawn is default there)
### Versions of Apache Airflow Providers
Not relevant
### Deployment
Other
### Deployment details
Any deployment with 'spawn' mutliprocessing mode method used is susceptible
to this problem. This is configured via (undocumented) feature on Linux
CORE__MP_START_METHOD="spawn" but this is also default method for Windows and
MacOS envieronments.
### What happened
Whenever the DagFileProcessor performs relod of airflow settings (always
when it starts), it also re-initializes SQLAlchemy/ORM engine. This
reinitialization in `spawn` mode of multiprocessing has unintended side effect
- it corrupts the SQLAlchemy session data in rather unpredictable ways. In our
CI tests it resulted with making objects created after the reload unavailable
to the session that queried for those objects.
This is particularly disruptive as the "parent" process for DagProcessor
manager currently is Scheduler, which heavlly relies on lots of ORM queries and
any corruption of the session data might lead to extremely difficult to
diagnose and debug problems, especially that such reload might happen
asynchronously.
**Note** - since the only "production" deployment of Airflow is Linux, and
this feature is undocumented, it is highly unlikely this problem will cause a
problem in production. However in our CI environment and in environments where
people use Airflow for development, it is quite likely to happen. This might
already cause stability issues for MacOS users (and in the future Windows
users) who run Airlfow locally to run tests rather than use Airflow in
Production. While this is lower priority, it's still big enough and important
group of users so that this problem should be fixed.
### What you expected to happen
We expect the "reload" is not happening - and specifically that the ORM
engine will not be reinitialized when spawned DagProcesorManager starts. This
is only needed currently because the logging configuration needs to be
recreated, specifically for DAG processor.
Note: In the future, when dag processor is separated to a separate
process/command line (planned for the multi-tenant work), this will not be a
problem. But the mode where DAG processor remains child process of Airflow is
likely to stay, so we should fix it.
### How to reproduce
* Checkout the code of Airflow from 30 November 2021
* Add `sleep(2)` command to initialization of the `_run_processor_manager`
in `airflow/dag_processing/manager.py`
```
@staticmethod
def _run_processor_manager(
dag_directory: str,
max_runs: int,
processor_timeout: timedelta,
signal_conn: MultiprocessingConnection,
dag_ids: Optional[List[str]],
pickle_dags: bool,
async_mode: bool,
) -> None:
time.sleep(2) # <-- add the liine here
# Make this process start as a new process group - that makes it easy
# to kill all sub-process of this at the OS-level, rather than having
# to iterate the child processes
os.setpgid(0, 0)
```
* Add `sleep(10) command in the `test_scheduler_keeps_scheduling_pool_full`
of `tests/jobs/test_scheduler_job.py`
```
# Create 5 dagruns for each DAG.
# To increase the chances the TIs from the "full" pool will get
retrieved first, we schedule all
# TIs from the first dag first.
for dr in _create_dagruns(dag_d1):
scheduler._schedule_dag_run(dr, session)
time.sleep(10) # <- Add sleep command here
for dr in _create_dagruns(dag_d2):
scheduler._schedule_dag_run(dr, session)
```
Run pytest tests in the following way:
```
pytest
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_multiprocessing_with_spawn_method
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_keeps_scheduling_pool_full
-s
```
Result:

In the logs you will also see the below, which is the manifestation of
session being broken. The object were added few lines before, but since they
were added already after the reloaad hae
```
[2021-12-01 13:08:52,842] {scheduler_job.py:1009} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-12-01 13:08:52,848] {scheduler_job.py:1009} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-12-01 13:08:52,854] {scheduler_job.py:1009} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-12-01 13:08:52,861] {scheduler_job.py:1009} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-12-01 13:08:52,867] {scheduler_job.py:1009} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
```
What happens here:
* the code in Aiflow before December 2021 had a bug (fixed at the beginning
of December) where dag file processor has not been killed if it was slow to
start (race condition)
* by adding sleep() command we are forcing the race condition to be
reproducible
* the first test `test_scheduler_multiprocessing_with_spawn_method`
finished, but due to the race condition DAGProcessor has not been terminated
when the test finished
* the second test started but paused for longer between two "create dagrun"
loops
* the first dagrun completed before reload
* the reload happened in the spawned process (after 2s)
* the second dagrun completed after the reload (after 10s)
* The dagruns created in the second loop were missing when the subsequent
code in scheduler job tried to retrieve them. the dagruns created before - were
accessible.
### Anything else
Very long investigation, but I am happy we found it. Fix is coming to the
"race condition" and the "spawn" tests were moved to where they belong
(dag_processor) with some more test cases added, but some further changes are
needed to solve the root cause - reload of the ORM should not happen in the
spawned process.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]