sunchao opened a new issue, #41869:
URL: https://github.com/apache/airflow/issues/41869

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.8.3
   
   ### What happened?
   
   Currently, Airflow `DagFileProcessorManager` and scheduler each operates in 
a long running loop. In the case when standalone DAG processor is NOT enabld, 
the two processes communicate over a Linux pipe (which typically with a 
capacity of 64K bytes).
   
   `DagFileProcessorManager` executes the `run_parsing_loop`, which roughly:
   1. List the DAG directory if necessary, and find all DAG files to be 
processed. It also exclude certain files, for instance, those that have been 
processed recently.
   2. For each DAG file, launches a separate sub-process to parse the file, 
serialized the DAGs in the file and sync them to the database
   3. Checks to see if there is any messages from the pipe, such as callbacks, 
if so, consume one message from the pipe.
   
   Scheduler executes the `_run_scheduler_loop`, which roughly:
   1. Pick up `N` DAG runs from the database 
   2. Process these DAG runs, e.g., updating from queued to running state, 
marking failed ones, etc
   3. Send callbacks (SLA callback, task failure callback, etc) to the DAG 
processor via the pipe
   4. Perform heartbeat
   
   The DAG processor loop is typically very fast and non-blocking, with the 
exception of listing DAG directory. Normally, the listing should only happen 
once in a while (default to 5 minutes). However, if the DAG directory list 
interval (`dag_dir_list_interval `) is configured too short, and/or the DAG 
directory itself contains a large number of files to process, it's possible 
that the DAG directory listing could be triggered in almost every loop 
iteration, which means the loop iteration can take a considerable amount of 
time.
   
   In comparison, the scheduler loops runs much faster. In each iteration, it 
could add some new callbacks to the pipe to be processed. Since in each 
iteration the DAG processor only consumes a single message from the pipe, the 
pipe would gradually fill up and eventually reach the 64k capacity. After this 
point, the scheduler is blocked at step 3, waiting for the callbacks to be 
sent. Notably, it cannot perform heartbeat and respond to the liveness probes. 
   
   There are a potential consequences from the above. First, the scheduler loop 
could get stack and thus causing DAGs not being scheduled soon enough. Second, 
in the scenario of Kubernetes, because it cannot answer to the liveness probe, 
the scheduler would be repeatedly killed by K8S and restarted, which is not 
desirable. Third, if the number of files to be parsed is too large in 
comparison to the DAG listing duration, as in each iteration the ordering for 
the returned file list could relatively remain the same, the top subset of 
files could be processed repeatedly while leaving the files at the bottom of 
the list not getting picked up only after certain time, therefore increasing 
the total parsing time.
   
   Internally we encountered this issue when setting the 
`dag_dir_list_interval` to be 30 seconds, while the total parsing time for the 
DAG directory is more than 30 seconds. 
   
   
   ### What you think should happen instead?
   
   It would be nice if
   1. Instead of consuming a single message each time, the DAG processor can 
drain all the messages in the pipe at the moment. This will reduce the risk of 
scheduler being blocked.
   2. Add metrics to DAG listing time. Currently this information is only 
available in `dag_processor_manager.log`
   
   ### How to reproduce
   
   It may be re-producible by:
   1. Have a DAG directory with many files so the DAG listing time is 
non-trivial
   2. Setting the `dag_dir_list_interval` to something small that is less than 
the listing time.
   
   
   
   ### Operating System
   
   Debian GNU/Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to