potiuk edited a comment on pull request #19860:
URL: https://github.com/apache/airflow/pull/19860#issuecomment-982481602


   OK. I have very interesting findings and I might need some brainstorming. 
@ashb @ephraimbuddy @uranusjr.
   
   I think know what is causing the problem wut but I have not figured out the 
mechanism (yet). I am running some experiments to get more data but the problem 
with `test_scheduler_keeps_scheduling_pool_full`  is most likely caused by side 
effect of 
   `test_scheduler_multiprocessing_with_spawn_method`  - and I am afraid it 
could have some real consequences on systems that are using `spawn` method for 
multiprocessing.
   
   This seems to be caused by a deffered re-importing of  `airflow.settings` 
generated by DagProcessor from the `spawn` test. However I scratch my head and 
cannot figure out how such reload in a - seamingly -  different process, with 
python interpreter started from scratch (this is how spawn works) could 
influence the tests running in a different process. It might be related to some 
shared memory or resources on the "driver" level for the database (this is the 
only thing left as a hypothesis)
   
   Some Observations:
   
   This problem is accompanied always by this log entry (which is generated by 
reloading settings by dag processor I believe): 
   
   ```
     [2021-11-29 19:14:45,094] {settings.py:52} INFO - Configured default 
timezone Timezone('UTC')
   ```
   
   This entry asynchronously appears at various places - sometimes during the 
test, sometimes in `setup` (the latter is a proof that this is a side effect 
from an earlier test and not generated by the test itself). Depending on when 
the "reload" happens, the effects on the test might be slighly different.
   
   * First of all the `test_scheduler_keeps_scheduling_pool_full` runs right 
after `test_scheduler_multiprocessing_with_spawn_method`  because that's the 
sequence in the test.
   * I tested it by disalbling the 
`test_scheduler_multiprocessing_with_spawn_method`  test. When the test is 
enabled, the problem happens rather consistently (1/2 failures for each CI 
builds). When the test is disabled, the  
`test_scheduler_keeps_scheduling_pool_full` consistently works fine
   * I am (just now) running an experiment where i moved the `spawn` test case 
after the `test_scheduler_keeps_scheduling_pool_full` and I will see what 
effect it will have (I guess the following tests might start failing).
   
   I see three interesting cases:
   
   Reload happening after or around second DAG syncing - here only d2 was 
missing:
   
   ```
     ---------------------------- Captured stdout setup 
-----------------------------
     [2021-11-29 19:14:44,996] {dagbag.py:500} INFO - Filling up the DagBag 
from /dev/null
     ------------------------------ Captured log setup 
------------------------------
     INFO     airflow.models.dagbag.DagBag:dagbag.py:500 Filling up the DagBag 
from /dev/null
     ----------------------------- Captured stdout call 
-----------------------------
     [2021-11-29 19:14:45,011] {dag.py:2395} INFO - Sync 1 DAGs
     [2021-11-29 19:14:45,015] {dag.py:2414} INFO - Creating ORM DAG for 
test_scheduler_keeps_scheduling_pool_full_d1
     [2021-11-29 19:14:45,021] {dag.py:2934} INFO - Setting next_dagrun for 
test_scheduler_keeps_scheduling_pool_full_d1 to 2016-01-01T00:00:00+00:00
     [2021-11-29 19:14:45,041] {dag.py:2395} INFO - Sync 1 DAGs
     [2021-11-29 19:14:45,044] {dag.py:2414} INFO - Creating ORM DAG for 
test_scheduler_keeps_scheduling_pool_full_d2
     [2021-11-29 19:14:45,050] {dag.py:2934} INFO - Setting next_dagrun for 
test_scheduler_keeps_scheduling_pool_full_d2 to 2016-01-01T00:00:00+00:00
     [2021-11-29 19:14:45,094] {settings.py:52} INFO - Configured default 
timezone Timezone('UTC')  <--- HERE RELOAD HAPPENED
     [2021-11-29 19:14:45,190] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-29 19:14:45,198] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-29 19:14:45,207] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-29 19:14:45,221] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-29 19:14:45,235] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-29 19:14:45,248] {scheduler_job.py:287} INFO - 2 tasks up for 
execution:
   
   ```
   
   Relead happening before DAG syncing (here both dags are missing).
   
   ```
     ---------------------------- Captured stdout setup 
-----------------------------
     [2021-11-28 17:37:58,370] {settings.py:52} INFO - Configured default 
timezone Timezone('UTC')  <-- HERE RELOAD HAPPENED
     [2021-11-28 17:37:58,372] {dagbag.py:500} INFO - Filling up the DagBag 
from /dev/null
     ------------------------------ Captured log setup 
------------------------------
     INFO     airflow.models.dagbag.DagBag:dagbag.py:500 Filling up the DagBag 
from /dev/null
     ----------------------------- Captured stdout call 
-----------------------------
     [2021-11-28 17:37:58,381] {dag.py:2395} INFO - Sync 1 DAGs
     [2021-11-28 17:37:58,384] {dag.py:2414} INFO - Creating ORM DAG for 
test_scheduler_keeps_scheduling_pool_full_d1
     [2021-11-28 17:37:58,388] {dag.py:2928} INFO - Setting next_dagrun for 
test_scheduler_keeps_scheduling_pool_full_d1 to 2016-01-01T00:00:00+00:00
     [2021-11-28 17:37:58,403] {dag.py:2395} INFO - Sync 1 DAGs
     [2021-11-28 17:37:58,406] {dag.py:2414} INFO - Creating ORM DAG for 
test_scheduler_keeps_scheduling_pool_full_d2
     [2021-11-28 17:37:58,410] {dag.py:2928} INFO - Setting next_dagrun for 
test_scheduler_keeps_scheduling_pool_full_d2 to 2016-01-01T00:00:00+00:00
     [2021-11-28 17:37:58,429] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d1 in DagBag/DB!
     [2021-11-28 17:37:58,438] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d1 in DagBag/DB!
     [2021-11-28 17:37:58,448] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d1 in DagBag/DB!
     [2021-11-28 17:37:58,461] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d1 in DagBag/DB!
     [2021-11-28 17:37:58,474] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d1 in DagBag/DB!
     [2021-11-28 17:37:58,487] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-28 17:37:58,501] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-28 17:37:58,514] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-28 17:37:58,527] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-28 17:37:58,539] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
   ```
   
   Reload happening between the dags syncing (here again d2 was missing):
   
   ```
     ---------------------------- Captured stdout setup 
-----------------------------
     [2021-11-28 13:41:46,975] {dagbag.py:500} INFO - Filling up the DagBag 
from /dev/null
     ------------------------------ Captured log setup 
------------------------------
     INFO     airflow.models.dagbag.DagBag:dagbag.py:500 Filling up the DagBag 
from /dev/null
     ----------------------------- Captured stdout call 
-----------------------------
     [2021-11-28 13:41:46,993] {dag.py:2395} INFO - Sync 1 DAGs
     [2021-11-28 13:41:46,998] {dag.py:2414} INFO - Creating ORM DAG for 
test_scheduler_keeps_scheduling_pool_full_d1
     [2021-11-28 13:41:47,004] {dag.py:2928} INFO - Setting next_dagrun for 
test_scheduler_keeps_scheduling_pool_full_d1 to 2016-01-01T00:00:00+00:00
     [2021-11-28 13:41:47,011] {settings.py:52} INFO - Configured default 
timezone Timezone('UTC')  <-- HERE RELOAD HAPPENS
     [2021-11-28 13:41:47,030] {dag.py:2395} INFO - Sync 1 DAGs
     [2021-11-28 13:41:47,035] {dag.py:2414} INFO - Creating ORM DAG for 
test_scheduler_keeps_scheduling_pool_full_d2
     [2021-11-28 13:41:47,042] {dag.py:2928} INFO - Setting next_dagrun for 
test_scheduler_keeps_scheduling_pool_full_d2 to 2016-01-01T00:00:00+00:00
     [2021-11-28 13:41:47,198] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-28 13:41:47,209] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-28 13:41:47,220] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-28 13:41:47,231] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
     [2021-11-28 13:41:47,241] {scheduler_job.py:992} ERROR - Couldn't find dag 
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
   ```
   
   
   It looks like it is not connected with exactly the moment of the log, but I 
believe it is caused with what happens next after the "Timezone" log is logged. 
When I look at the code, right after re-importing the settings, DagProcessor 
calls `initialize()` method there are plenty of possibilities where some fo 
those (`configure_orm()` ?)  might cause disruption to SQLAlchemy models stored 
in session.
   
   ```
   def initialize():
       """Initialize Airflow with all the settings from this file"""
       configure_vars()
       prepare_syspath()
       import_local_settings()
       global LOGGING_CLASS_PATH
       LOGGING_CLASS_PATH = configure_logging()
       configure_adapters()
       # The webservers import this file from models.py with the default 
settings.
       configure_orm()
       configure_action_logging()
   
       # Ensure we close DB connections at scheduler and gunicorn worker 
terminations
       atexit.register(dispose_orm)
   ```
   
   However I failed to come up with a theory why such reload in a spawned 
process might cause this really? And why the spawned DagProcessor is not killed 
by this in pytest fixture ?
   
   ```
           if self.scheduler_job and self.scheduler_job.processor_agent:
               self.scheduler_job.processor_agent.end()
               self.scheduler_job = None
   ```
   
   
   Any help and brainstorming appreciated.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to