zhiyong-ong commented on code in PR #27102:
URL: https://github.com/apache/airflow/pull/27102#discussion_r1011803909


##########
airflow/models/taskinstance.py:
##########
@@ -2503,34 +2503,78 @@ def filter_for_tis(tis: Iterable[TaskInstance | 
TaskInstanceKey]) -> BooleanClau
         run_id = first.run_id
         map_index = first.map_index
         first_task_id = first.task_id
+
+        # pre-compute the set of dag_id, run_id, map_indices and task_ids
+        dag_ids, run_ids, map_indices, task_ids = set(), set(), set(), set()
+        for t in tis:
+            dag_ids.add(t.dag_id)
+            run_ids.add(t.run_id)
+            map_indices.add(t.map_index)
+            task_ids.add(t.task_id)
+
         # Common path optimisations: when all TIs are for the same dag_id and 
run_id, or same dag_id
         # and task_id -- this can be over 150x faster for huge numbers of TIs 
(20k+)
-        if all(t.dag_id == dag_id and t.run_id == run_id and t.map_index == 
map_index for t in tis):
+        if dag_ids == {dag_id} and run_ids == {run_id} and map_indices == 
{map_index}:
             return and_(
                 TaskInstance.dag_id == dag_id,
                 TaskInstance.run_id == run_id,
                 TaskInstance.map_index == map_index,
-                TaskInstance.task_id.in_(t.task_id for t in tis),
+                TaskInstance.task_id.in_(task_ids),
             )
-        if all(t.dag_id == dag_id and t.task_id == first_task_id and 
t.map_index == map_index for t in tis):
+        if dag_ids == {dag_id} and task_ids == {first_task_id} and map_indices 
== {map_index}:
             return and_(
                 TaskInstance.dag_id == dag_id,
-                TaskInstance.run_id.in_(t.run_id for t in tis),
+                TaskInstance.run_id.in_(run_ids),
                 TaskInstance.map_index == map_index,
                 TaskInstance.task_id == first_task_id,
             )
-        if all(t.dag_id == dag_id and t.run_id == run_id and t.task_id == 
first_task_id for t in tis):
+        if dag_ids == {dag_id} and run_ids == {run_id} and task_ids == 
{first_task_id}:
             return and_(
                 TaskInstance.dag_id == dag_id,
                 TaskInstance.run_id == run_id,
-                TaskInstance.map_index.in_(t.map_index for t in tis),
+                TaskInstance.map_index.in_(map_indices),
                 TaskInstance.task_id == first_task_id,
             )

Review Comment:
   this is a very small optimisation in the grand scheme of things, but it does 
improve the speed by a little (the vast majority of the optimisation is below, 
in the improved sql query).
   
   instead of constantly iterating through the `tis` to obtain the various 
attributes to compare with a static value, in 3 different conditional 
statements, we instead pre-compute those attributes into a set and compare that 
with a static set
   
   some small benchmarks on this:
   ```
   In [1]: x = [5 for i in range(1000000)]
   
   In [2]: %timeit all(i == 5 for i in x)
   43.6 ms ± 2.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
   
   In [3]: %timeit set(x) == {5}
   8.02 ms ± 179 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
   ```
   
   clearing the tasks which fulfils the 3rd condition in these 3 fast paths 
gives me:
   
   old
   ```
   [2022-11-02 13:48:27,970] {views.py:2017} INFO - Number of tasks to clear: 
500
   [2022-11-02 13:48:27,970] {views.py:2018} INFO - Time taken: 
0.08926955598872155
   ```
   new
   ```
   [2022-11-02 13:52:04,373] {views.py:2017} INFO - Number of tasks to clear: 
500
   [2022-11-02 13:52:04,373] {views.py:2018} INFO - Time taken: 
0.060697362991049886
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to