sunchao opened a new issue, #41869: URL: https://github.com/apache/airflow/issues/41869
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.8.3 ### What happened? Currently, Airflow `DagFileProcessorManager` and scheduler each operates in a long running loop. In the case when standalone DAG processor is NOT enabld, the two processes communicate over a Linux pipe (which typically with a capacity of 64K bytes). `DagFileProcessorManager` executes the `run_parsing_loop`, which roughly: 1. List the DAG directory if necessary, and find all DAG files to be processed. It also exclude certain files, for instance, those that have been processed recently. 2. For each DAG file, launches a separate sub-process to parse the file, serialized the DAGs in the file and sync them to the database 3. Checks to see if there is any messages from the pipe, such as callbacks, if so, consume one message from the pipe. Scheduler executes the `_run_scheduler_loop`, which roughly: 1. Pick up `N` DAG runs from the database 2. Process these DAG runs, e.g., updating from queued to running state, marking failed ones, etc 3. Send callbacks (SLA callback, task failure callback, etc) to the DAG processor via the pipe 4. Perform heartbeat The DAG processor loop is typically very fast and non-blocking, with the exception of listing DAG directory. Normally, the listing should only happen once in a while (default to 5 minutes). However, if the DAG directory list interval (`dag_dir_list_interval `) is configured too short, and/or the DAG directory itself contains a large number of files to process, it's possible that the DAG directory listing could be triggered in almost every loop iteration, which means the loop iteration can take a considerable amount of time. In comparison, the scheduler loops runs much faster. In each iteration, it could add some new callbacks to the pipe to be processed. Since in each iteration the DAG processor only consumes a single message from the pipe, the pipe would gradually fill up and eventually reach the 64k capacity. After this point, the scheduler is blocked at step 3, waiting for the callbacks to be sent. Notably, it cannot perform heartbeat and respond to the liveness probes. There are a potential consequences from the above. First, the scheduler loop could get stack and thus causing DAGs not being scheduled soon enough. Second, in the scenario of Kubernetes, because it cannot answer to the liveness probe, the scheduler would be repeatedly killed by K8S and restarted, which is not desirable. Third, if the number of files to be parsed is too large in comparison to the DAG listing duration, as in each iteration the ordering for the returned file list could relatively remain the same, the top subset of files could be processed repeatedly while leaving the files at the bottom of the list not getting picked up only after certain time, therefore increasing the total parsing time. Internally we encountered this issue when setting the `dag_dir_list_interval` to be 30 seconds, while the total parsing time for the DAG directory is more than 30 seconds. ### What you think should happen instead? It would be nice if 1. Instead of consuming a single message each time, the DAG processor can drain all the messages in the pipe at the moment. This will reduce the risk of scheduler being blocked. 2. Add metrics to DAG listing time. Currently this information is only available in `dag_processor_manager.log` ### How to reproduce It may be re-producible by: 1. Have a DAG directory with many files so the DAG listing time is non-trivial 2. Setting the `dag_dir_list_interval` to something small that is less than the listing time. ### Operating System Debian GNU/Linux ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
