oboki opened a new pull request, #46303:
URL: https://github.com/apache/airflow/pull/46303
This PR adds a `STRAIGHT_JOIN` hint to prevent an unintended full scan in
`get_sorted_triggers`.
With MySQL backend, if there are many records in the `trigger`, it causes
the `task_instance` to be scanned first as the driving table, leading to slow
queries.
This issue happens often during the early morning, which might be related to
the async sensors.
To test this, I stopped the `triggerer` process to keep it unassigned first,
and generated some dummy data, as shown below:
```python
from airflow.models.trigger import Trigger
from airflow.utils.session import NEW_SESSION, provide_session
@provide_session
def add_trigger(session=NEW_SESSION):
for _ in range(10):
session.add(Trigger(classpath="", kwargs={"key": "value"}))
session.commit()
if __name__ == "__main__":
add_trigger()
```
With the following 10 records where the triggerer is not assigned:
```sql
MySQL [airflow]> SELECT * FROM `trigger`;
+----+-----------+-----------------------------------------------+----------------------------+--------------+
| id | classpath | kwargs |
created_date | triggerer_id |
+----+-----------+-----------------------------------------------+----------------------------+--------------+
| 41 | | {"__var": {"key": "value"}, "__type": "dict"} |
2025-01-31 03:00:02.982334 | NULL |
| 42 | | {"__var": {"key": "value"}, "__type": "dict"} |
2025-01-31 03:00:02.982502 | NULL |
| 43 | | {"__var": {"key": "value"}, "__type": "dict"} |
2025-01-31 03:00:02.982536 | NULL |
| 44 | | {"__var": {"key": "value"}, "__type": "dict"} |
2025-01-31 03:00:02.982560 | NULL |
| 45 | | {"__var": {"key": "value"}, "__type": "dict"} |
2025-01-31 03:00:02.982583 | NULL |
| 46 | | {"__var": {"key": "value"}, "__type": "dict"} |
2025-01-31 03:00:02.982675 | NULL |
| 47 | | {"__var": {"key": "value"}, "__type": "dict"} |
2025-01-31 03:00:02.982696 | NULL |
| 48 | | {"__var": {"key": "value"}, "__type": "dict"} |
2025-01-31 03:00:02.982713 | NULL |
| 49 | | {"__var": {"key": "value"}, "__type": "dict"} |
2025-01-31 03:00:02.982731 | NULL |
| 50 | | {"__var": {"key": "value"}, "__type": "dict"} |
2025-01-31 03:00:02.982749 | NULL |
+----+-----------+-----------------------------------------------+----------------------------+--------------+
10 rows in set (0.000 sec)
```
The execution plan of the AS-IS query shows that it does a full scan of
`task_instance` first, causing a slowdown of the query due to the unnecessary
scan.
```sql
MySQL [airflow]>
MySQL [airflow]> EXPLAIN
-> SELECT `trigger`.id
-> FROM `trigger` INNER JOIN task_instance ON `trigger`.id =
task_instance.trigger_id
-> WHERE `trigger`.triggerer_id IS NULL OR (`trigger`.triggerer_id NOT
IN (SELECT job.id
-> FROM job
-> WHERE job.end_date IS NULL AND job.latest_heartbeat > '2025-01-31
00:00:00' AND job.job_type = 'TriggererJob')) ORDER BY
coalesce(task_instance.priority_weight, 0) DESC, `trigger`.created_date
-> LIMIT 998 FOR UPDATE SKIP LOCKED;
+----+-------------+---------------+------------+--------+------------------------+----------------+---------+----------------------------------+------+----------+----------------------------------------------+
| id | select_type | table | partitions | type | possible_keys
| key | key_len | ref | rows |
filtered | Extra |
+----+-------------+---------------+------------+--------+------------------------+----------------+---------+----------------------------------+------+----------+----------------------------------------------+
| 1 | PRIMARY | task_instance | NULL | ALL | ti_trigger_id
| NULL | NULL | NULL | 1354 |
100.00 | Using where; Using temporary; Using filesort |
| 1 | PRIMARY | trigger | NULL | eq_ref | PRIMARY
| PRIMARY | 4 | airflow.task_instance.trigger_id | 1 |
100.00 | Using where |
| 2 | SUBQUERY | job | NULL | range |
PRIMARY,job_type_heart | job_type_heart | 131 | NULL
| 3 | 10.00 | Using index condition; Using where |
+----+-------------+---------------+------------+--------+------------------------+----------------+---------+----------------------------------+------+----------+----------------------------------------------+
3 rows in set, 1 warning (0.001 sec)
```
When adding `.prefix_with("STRAIGHT_JOIN", dialect="mysql")`, the compiled
SQL execution plan improved as shown below:
```sql
MySQL [airflow]> EXPLAIN
-> SELECT STRAIGHT_JOIN `trigger`.id
-> FROM `trigger` INNER JOIN task_instance ON `trigger`.id =
task_instance.trigger_id
-> WHERE `trigger`.triggerer_id IS NULL OR (`trigger`.triggerer_id NOT
IN (SELECT job.id
-> FROM job
-> WHERE job.end_date IS NULL AND job.latest_heartbeat > '2025-01-31
00:00:00' AND job.job_type = 'TriggererJob')) ORDER BY
coalesce(task_instance.priority_weight, 0) DESC, `trigger`.created_date
-> LIMIT 998 FOR UPDATE SKIP LOCKED;
+----+--------------------+---------------+------------+-----------------+------------------------+---------+---------+------+------+----------+----------------------------------------------+
| id | select_type | table | partitions | type |
possible_keys | key | key_len | ref | rows | filtered | Extra
|
+----+--------------------+---------------+------------+-----------------+------------------------+---------+---------+------+------+----------+----------------------------------------------+
| 1 | PRIMARY | trigger | NULL | ALL |
PRIMARY | NULL | NULL | NULL | 10 | 100.00 | Using
where; Using temporary; Using filesort |
| 1 | PRIMARY | task_instance | NULL | ALL |
ti_trigger_id | NULL | NULL | NULL | 1354 | 100.00 | Using
where; Using join buffer (hash join) |
| 2 | DEPENDENT SUBQUERY | job | NULL | unique_subquery |
PRIMARY,job_type_heart | PRIMARY | 4 | func | 1 | 5.00 | Using
where; Full scan on NULL key |
+----+--------------------+---------------+------------+-----------------+------------------------+---------+---------+------+------+----------+----------------------------------------------+
3 rows in set, 1 warning (0.001 sec)
```
In the case of PostgreSQL, regardless of the number of records in the
trigger, the execution plan remains the same and efficient as shown below:
```sql
Limit (cost=113.21..121.68 rows=678 width=28)
-> LockRows (cost=113.21..121.68 rows=678 width=28)
-> Sort (cost=113.21..114.90 rows=678 width=28)
Sort Key: (COALESCE(task_instance.priority_weight, 0)) DESC,
trigger.created_date
-> Hash Join (cost=19.21..81.32 rows=678 width=28)
Hash Cond: (task_instance.trigger_id = trigger.id)
-> Seq Scan on task_instance (cost=0.00..58.55
rows=1355 width=14)
-> Hash (cost=18.40..18.40 rows=65 width=18)
-> Seq Scan on trigger (cost=6.78..18.40
rows=65 width=18)
Filter: ((triggerer_id IS NULL) OR (NOT
(hashed SubPlan 1)))
SubPlan 1
-> Seq Scan on job (cost=0.00..6.78
rows=1 width=4)
Filter: ((end_date IS NULL) AND
(latest_heartbeat > '2025-01-31 00:00:00+00'::timestamp with time zone) AND
((job_type)
::text = 'TriggererJob'::text))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]