michaelmicheal opened a new issue, #25627: URL: https://github.com/apache/airflow/issues/25627
### Apache Airflow version Other Airflow 2 version ### What happened Airflow Version: 2.2.5 MySQL Version: 8.0.18 In the Scheduler, we are coming across instances where MySQL is inefficiently optimizing the [critical section task queuing query](https://github.com/apache/airflow/blob/2.2.5/airflow/jobs/scheduler_job.py#L294-L303). When a large number of task instances are scheduled, MySQL failing to use the `ti_state` index to filter the `task_instance` table, resulting in a full table scan (about 7.3 million rows). Normally, when running the critical section query the index on `task_instance.state` is used to filter scheduled `task_instances`. ```bash | -> Limit: 512 row(s) (actual time=5.290..5.413 rows=205 loops=1) -> Sort row IDs: <temporary>.tmp_field_0, <temporary>.execution_date, limit input to 512 row(s) per chunk (actual time=5.289..5.391 rows=205 loops=1) -> Table scan on <temporary> (actual time=0.003..0.113 rows=205 loops=1) -> Temporary table (actual time=5.107..5.236 rows=205 loops=1) -> Nested loop inner join (cost=20251.99 rows=1741) (actual time=0.100..4.242 rows=205 loops=1) -> Nested loop inner join (cost=161.70 rows=12) (actual time=0.071..2.436 rows=205 loops=1) -> Index lookup on task_instance using ti_state (state='scheduled') (cost=80.85 rows=231) (actual time=0.051..1.992 rows=222 loops=1) -> Filter: ((dag_run.run_type <> 'backfill') and (dag_run.state = 'running')) (cost=0.25 rows=0) (actual time=0.002..0.002 rows=1 loops=222) -> Single-row index lookup on dag_run using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id) (cost=0.25 rows=1) (actual time=0.001..0.001 rows=1 loops=222) -> Filter: ((dag.is_paused = 0) and (task_instance.dag_id = dag.dag_id)) (cost=233.52 rows=151) (actual time=0.008..0.008 rows=1 loops=205) -> Index range scan on dag (re-planned for each iteration) (cost=233.52 rows=15072) (actual time=0.008..0.008 rows=1 loops=205) 1 row in set, 1 warning (0.03 sec) ``` When a large number of task_instances are in scheduled state at the same time, the index on `task_instance.state` is not being used to filter scheduled `task_instances`. ```bash | -> Limit: 512 row(s) (actual time=12110.251..12110.573 rows=512 loops=1) -> Sort row IDs: <temporary>.tmp_field_0, <temporary>.execution_date, limit input to 512 row(s) per chunk (actual time=12110.250..12110.526 rows=512 loops=1) -> Table scan on <temporary> (actual time=0.005..0.800 rows=1176 loops=1) -> Temporary table (actual time=12109.022..12109.940 rows=1176 loops=1) -> Nested loop inner join (cost=10807.83 rows=3) (actual time=1.328..12097.528 rows=1176 loops=1) -> Nested loop inner join (cost=10785.34 rows=64) (actual time=1.293..12084.371 rows=1193 loops=1) -> Filter: (dag.is_paused = 0) (cost=1371.40 rows=1285) (actual time=0.087..22.409 rows=13264 loops=1) -> Table scan on dag (cost=1371.40 rows=12854) (actual time=0.085..15.796 rows=13508 loops=1) -> Filter: ((task_instance.state = 'scheduled') and (task_instance.dag_id = dag.dag_id)) (cost=0.32 rows=0) (actual time=0.907..0.909 rows=0 loops=13264) -> Index lookup on task_instance using PRIMARY (dag_id=dag.dag_id) (cost=0.32 rows=70) (actual time=0.009..0.845 rows=553 loops=13264) -> Filter: ((dag_run.run_type <> 'backfill') and (dag_run.state = 'running')) (cost=0.25 rows=0) (actual time=0.010..0.011 rows=1 loops=1193) -> Single-row index lookup on dag_run using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id) (cost=0.25 rows=1) (actual time=0.009..0.010 rows=1 loops=1193) 1 row in set, 1 warning (12.14 sec) ``` ### What you think should happen instead To resolve this, I added a patch on the `scheduler_job.py` file, adding a MySQL index hint to use the `ti_state` index. ```diff --- /usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py +++ /usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py @@ -293,6 +293,7 @@ class SchedulerJob(BaseJob): # and the dag is not paused query = ( session.query(TI) + .with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql') .join(TI.dag_run) .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING) .join(TI.dag_model) ``` I think it makes sense to add this index hint upstream. ### How to reproduce Schedule a large number of dag runs and tasks in a short period of time. ### Operating System Debian GNU/Linux 10 (buster) ### Versions of Apache Airflow Providers _No response_ ### Deployment Other 3rd-party Helm chart ### Deployment details Airflow 2.2.5 on Kubernetes MySQL Version: 8.0.18 ### Anything else _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
