oboki opened a new pull request, #46303:
URL: https://github.com/apache/airflow/pull/46303

   This PR adds a `STRAIGHT_JOIN` hint to prevent an unintended full scan in 
`get_sorted_triggers`.
   
   With MySQL backend, if there are many records in the `trigger`, it causes 
the `task_instance` to be scanned first as the driving table, leading to slow 
queries.
   
   This issue happens often during the early morning, which might be related to 
the async sensors.
   
   To test this, I stopped the `triggerer` process to keep it unassigned first, 
and generated some dummy data, as shown below:
   
   ```python
   from airflow.models.trigger import Trigger
   from airflow.utils.session import NEW_SESSION, provide_session
   
   @provide_session
   def add_trigger(session=NEW_SESSION):
       for _ in range(10):
           session.add(Trigger(classpath="", kwargs={"key": "value"}))
       session.commit()
   
   if __name__ == "__main__":
       add_trigger()
   ```
   
   With the following 10 records where the triggerer is not assigned:
   
   ```sql
   MySQL [airflow]> SELECT * FROM `trigger`;
   
+----+-----------+-----------------------------------------------+----------------------------+--------------+
   | id | classpath | kwargs                                        | 
created_date               | triggerer_id |
   
+----+-----------+-----------------------------------------------+----------------------------+--------------+
   | 41 |           | {"__var": {"key": "value"}, "__type": "dict"} | 
2025-01-31 03:00:02.982334 |         NULL |
   | 42 |           | {"__var": {"key": "value"}, "__type": "dict"} | 
2025-01-31 03:00:02.982502 |         NULL |
   | 43 |           | {"__var": {"key": "value"}, "__type": "dict"} | 
2025-01-31 03:00:02.982536 |         NULL |
   | 44 |           | {"__var": {"key": "value"}, "__type": "dict"} | 
2025-01-31 03:00:02.982560 |         NULL |
   | 45 |           | {"__var": {"key": "value"}, "__type": "dict"} | 
2025-01-31 03:00:02.982583 |         NULL |
   | 46 |           | {"__var": {"key": "value"}, "__type": "dict"} | 
2025-01-31 03:00:02.982675 |         NULL |
   | 47 |           | {"__var": {"key": "value"}, "__type": "dict"} | 
2025-01-31 03:00:02.982696 |         NULL |
   | 48 |           | {"__var": {"key": "value"}, "__type": "dict"} | 
2025-01-31 03:00:02.982713 |         NULL |
   | 49 |           | {"__var": {"key": "value"}, "__type": "dict"} | 
2025-01-31 03:00:02.982731 |         NULL |
   | 50 |           | {"__var": {"key": "value"}, "__type": "dict"} | 
2025-01-31 03:00:02.982749 |         NULL |
   
+----+-----------+-----------------------------------------------+----------------------------+--------------+
   10 rows in set (0.000 sec)
   ```
   
   The execution plan of the AS-IS query shows that it does a full scan of 
`task_instance` first, causing a slowdown of the query due to the unnecessary 
scan.
   
   ```sql
   MySQL [airflow]> 
   MySQL [airflow]> EXPLAIN
       -> SELECT `trigger`.id 
       -> FROM `trigger` INNER JOIN task_instance ON `trigger`.id = 
task_instance.trigger_id 
       -> WHERE `trigger`.triggerer_id IS NULL OR (`trigger`.triggerer_id NOT 
IN (SELECT job.id 
       -> FROM job 
       -> WHERE job.end_date IS NULL AND job.latest_heartbeat > '2025-01-31 
00:00:00' AND job.job_type = 'TriggererJob')) ORDER BY 
coalesce(task_instance.priority_weight, 0) DESC, `trigger`.created_date 
       ->  LIMIT 998 FOR UPDATE SKIP LOCKED;
   
+----+-------------+---------------+------------+--------+------------------------+----------------+---------+----------------------------------+------+----------+----------------------------------------------+
   | id | select_type | table         | partitions | type   | possible_keys     
     | key            | key_len | ref                              | rows | 
filtered | Extra                                        |
   
+----+-------------+---------------+------------+--------+------------------------+----------------+---------+----------------------------------+------+----------+----------------------------------------------+
   |  1 | PRIMARY     | task_instance | NULL       | ALL    | ti_trigger_id     
     | NULL           | NULL    | NULL                             | 1354 |   
100.00 | Using where; Using temporary; Using filesort |
   |  1 | PRIMARY     | trigger       | NULL       | eq_ref | PRIMARY           
     | PRIMARY        | 4       | airflow.task_instance.trigger_id |    1 |   
100.00 | Using where                                  |
   |  2 | SUBQUERY    | job           | NULL       | range  | 
PRIMARY,job_type_heart | job_type_heart | 131     | NULL                        
     |    3 |    10.00 | Using index condition; Using where           |
   
+----+-------------+---------------+------------+--------+------------------------+----------------+---------+----------------------------------+------+----------+----------------------------------------------+
   3 rows in set, 1 warning (0.001 sec)
   
   ```
   
   When adding `.prefix_with("STRAIGHT_JOIN", dialect="mysql")`, the compiled 
SQL execution plan improved as shown below:
   
   ```sql
   MySQL [airflow]> EXPLAIN
       -> SELECT STRAIGHT_JOIN `trigger`.id 
       -> FROM `trigger` INNER JOIN task_instance ON `trigger`.id = 
task_instance.trigger_id 
       -> WHERE `trigger`.triggerer_id IS NULL OR (`trigger`.triggerer_id NOT 
IN (SELECT job.id 
       -> FROM job 
       -> WHERE job.end_date IS NULL AND job.latest_heartbeat > '2025-01-31 
00:00:00' AND job.job_type = 'TriggererJob')) ORDER BY 
coalesce(task_instance.priority_weight, 0) DESC, `trigger`.created_date 
       ->  LIMIT 998 FOR UPDATE SKIP LOCKED;
   
+----+--------------------+---------------+------------+-----------------+------------------------+---------+---------+------+------+----------+----------------------------------------------+
   | id | select_type        | table         | partitions | type            | 
possible_keys          | key     | key_len | ref  | rows | filtered | Extra     
                                   |
   
+----+--------------------+---------------+------------+-----------------+------------------------+---------+---------+------+------+----------+----------------------------------------------+
   |  1 | PRIMARY            | trigger       | NULL       | ALL             | 
PRIMARY                | NULL    | NULL    | NULL |   10 |   100.00 | Using 
where; Using temporary; Using filesort |
   |  1 | PRIMARY            | task_instance | NULL       | ALL             | 
ti_trigger_id          | NULL    | NULL    | NULL | 1354 |   100.00 | Using 
where; Using join buffer (hash join)   |
   |  2 | DEPENDENT SUBQUERY | job           | NULL       | unique_subquery | 
PRIMARY,job_type_heart | PRIMARY | 4       | func |    1 |     5.00 | Using 
where; Full scan on NULL key           |
   
+----+--------------------+---------------+------------+-----------------+------------------------+---------+---------+------+------+----------+----------------------------------------------+
   3 rows in set, 1 warning (0.001 sec)
   
   ```
   
   In the case of PostgreSQL, regardless of the number of records in the 
trigger, the execution plan remains the same and efficient as shown below:
   
   ```sql
    Limit  (cost=113.21..121.68 rows=678 width=28)
      ->  LockRows  (cost=113.21..121.68 rows=678 width=28)
            ->  Sort  (cost=113.21..114.90 rows=678 width=28)
                  Sort Key: (COALESCE(task_instance.priority_weight, 0)) DESC, 
trigger.created_date
                  ->  Hash Join  (cost=19.21..81.32 rows=678 width=28)
                        Hash Cond: (task_instance.trigger_id = trigger.id)
                        ->  Seq Scan on task_instance  (cost=0.00..58.55 
rows=1355 width=14)
                        ->  Hash  (cost=18.40..18.40 rows=65 width=18)
                              ->  Seq Scan on trigger  (cost=6.78..18.40 
rows=65 width=18)
                                    Filter: ((triggerer_id IS NULL) OR (NOT 
(hashed SubPlan 1)))
                                    SubPlan 1
                                      ->  Seq Scan on job  (cost=0.00..6.78 
rows=1 width=4)
                                            Filter: ((end_date IS NULL) AND 
(latest_heartbeat > '2025-01-31 00:00:00+00'::timestamp with time zone) AND 
((job_type)
   ::text = 'TriggererJob'::text))
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to