potiuk opened a new issue #19934:
URL: https://github.com/apache/airflow/issues/19934


   ### Apache Airflow version
   
   main (development)
   
   ### Operating System
   
   Unix with 'spawn' method, MacOS, Windows (spawn is default there)
   
   ### Versions of Apache Airflow Providers
   
   Not relevant
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Any deployment with 'spawn' mutliprocessing mode method used is susceptible 
to this problem. This is configured via (undocumented) feature on Linux 
CORE__MP_START_METHOD="spawn" but this is also default method for Windows and 
MacOS envieronments.
   
   
   ### What happened
   
   Whenever the DagFileProcessor performs relod of airflow settings (always 
when it starts), it also re-initializes SQLAlchemy/ORM engine. This 
reinitialization in `spawn` mode of multiprocessing has unintended side effect 
- it corrupts the SQLAlchemy session data in rather unpredictable ways. In our 
CI tests it resulted with making objects created after the reload unavailable 
to the session that queried for those objects. 
   
   This is particularly disruptive as the "parent" process for DagProcessor 
manager currently is Scheduler, which heavlly relies on lots of ORM queries and 
any corruption of the session data might lead to extremely difficult to 
diagnose and debug problems, especially that such reload might happen 
asynchronously.
   
   **Note** - since the only "production" deployment of Airflow is Linux, and 
this feature is undocumented, it is highly unlikely this problem will cause a 
problem in production. However in our CI environment and in environments where 
people use Airflow for development, it is quite likely to happen.  This might 
already cause stability issues for MacOS users (and in the future Windows 
users) who run Airlfow locally to run tests rather than use Airflow in 
Production. While this is lower priority, it's still big enough and important 
group of users so that this problem should be fixed.  
   
   
   ### What you expected to happen
   
   We expect the "reload" is not happening - and specifically that the ORM 
engine will not be reinitialized when spawned DagProcesorManager starts. This 
is only needed currently because the logging configuration needs to be 
recreated, specifically for DAG processor.
   
   Note: In the future, when dag processor is separated to a separate 
process/command line (planned for the multi-tenant work), this will not be a 
problem. But the mode where DAG processor remains child process of Airflow is 
likely to stay, so we should fix it.
   
   ### How to reproduce
   
   * Checkout the code of Airflow from 30 November 2021
   * Add `sleep(2)` command to initialization of  the `_run_processor_manager` 
in `airflow/dag_processing/manager.py`
   
   ```
       @staticmethod
       def _run_processor_manager(
           dag_directory: str,
           max_runs: int,
           processor_timeout: timedelta,
           signal_conn: MultiprocessingConnection,
           dag_ids: Optional[List[str]],
           pickle_dags: bool,
           async_mode: bool,
       ) -> None:
           time.sleep(2)  # <-- add the liine here
           # Make this process start as a new process group - that makes it easy
           # to kill all sub-process of this at the OS-level, rather than having
           # to iterate the child processes
           os.setpgid(0, 0)
   ```
   
   * Add `sleep(10) command in the `test_scheduler_keeps_scheduling_pool_full` 
of `tests/jobs/test_scheduler_job.py`
   ```
           # Create 5 dagruns for each DAG.
           # To increase the chances the TIs from the "full" pool will get 
retrieved first, we schedule all
           # TIs from the first dag first.
           for dr in _create_dagruns(dag_d1):
               scheduler._schedule_dag_run(dr, session)
   
          time.sleep(10) # <- Add sleep command here
   
           for dr in _create_dagruns(dag_d2):
               scheduler._schedule_dag_run(dr, session)
   ```
   
   
   Run pytest tests in the following way:
   
   ```
   pytest 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_multiprocessing_with_spawn_method
 
tests/jobs/test_scheduler_job.py::TestSchedulerJob::test_scheduler_keeps_scheduling_pool_full
 -s
   ```
   
   Result:
   
   ![Screenshot from 2021-12-01 
14-09-44](https://user-images.githubusercontent.com/595491/144269076-1e912faf-3a82-4f3a-b03f-ed34d730b33e.png)
   
   In the logs you will also see the below, which is the manifestation of 
session being broken. The object were added few lines before, but since they 
were added already after the reloaad hae
   
   ```
   [2021-12-01 13:08:52,842] {scheduler_job.py:1009} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
   [2021-12-01 13:08:52,848] {scheduler_job.py:1009} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
   [2021-12-01 13:08:52,854] {scheduler_job.py:1009} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
   [2021-12-01 13:08:52,861] {scheduler_job.py:1009} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
   [2021-12-01 13:08:52,867] {scheduler_job.py:1009} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
   ```
   
   What happens here:
   
   * the code in Aiflow before December 2021 had a bug (fixed at the beginning 
of December) where dag file processor has not been killed if it was slow to 
start (race condition) 
   * by adding sleep() command we are forcing the race condition to be 
reproducible
   * the first test `test_scheduler_multiprocessing_with_spawn_method` 
finished, but due to the race condition DAGProcessor has not been terminated 
when the test finished
   * the second test started but paused for longer between two "create dagrun" 
loops
   * the first dagrun completed before reload
   * the reload happened in the spawned process (after 2s)
   * the second dagrun completed after the reload (after 10s) 
   * The dagruns created in the second loop were missing when the subsequent 
code in scheduler job tried to retrieve them. the dagruns created before - were 
accessible. 
   
   
   
   
   ### Anything else
   
   Very long investigation, but I am happy we found it. Fix is coming to the 
"race condition" and the "spawn" tests were moved to where they belong 
(dag_processor) with some more test cases added, but some further changes are 
needed to solve the root cause - reload of the ORM should not happen in the 
spawned process.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to