uranusjr commented on code in PR #27102:
URL: https://github.com/apache/airflow/pull/27102#discussion_r1011252348


##########
airflow/models/taskinstance.py:
##########
@@ -2503,34 +2503,78 @@ def filter_for_tis(tis: Iterable[TaskInstance | 
TaskInstanceKey]) -> BooleanClau
         run_id = first.run_id
         map_index = first.map_index
         first_task_id = first.task_id
+
+        # pre-compute the set of dag_id, run_id, map_indices and task_ids
+        dag_ids, run_ids, map_indices, task_ids = set(), set(), set(), set()
+        for t in tis:
+            dag_ids.add(t.dag_id)
+            run_ids.add(t.run_id)
+            map_indices.add(t.map_index)
+            task_ids.add(t.task_id)
+
         # Common path optimisations: when all TIs are for the same dag_id and 
run_id, or same dag_id
         # and task_id -- this can be over 150x faster for huge numbers of TIs 
(20k+)
-        if all(t.dag_id == dag_id and t.run_id == run_id and t.map_index == 
map_index for t in tis):
+        if dag_ids == {dag_id} and run_ids == {run_id} and map_indices == 
{map_index}:
             return and_(
                 TaskInstance.dag_id == dag_id,
                 TaskInstance.run_id == run_id,
                 TaskInstance.map_index == map_index,
-                TaskInstance.task_id.in_(t.task_id for t in tis),
+                TaskInstance.task_id.in_(task_ids),
             )
-        if all(t.dag_id == dag_id and t.task_id == first_task_id and 
t.map_index == map_index for t in tis):
+        if dag_ids == {dag_id} and task_ids == {first_task_id} and map_indices 
== {map_index}:
             return and_(
                 TaskInstance.dag_id == dag_id,
-                TaskInstance.run_id.in_(t.run_id for t in tis),
+                TaskInstance.run_id.in_(run_ids),
                 TaskInstance.map_index == map_index,
                 TaskInstance.task_id == first_task_id,
             )
-        if all(t.dag_id == dag_id and t.run_id == run_id and t.task_id == 
first_task_id for t in tis):
+        if dag_ids == {dag_id} and run_ids == {run_id} and task_ids == 
{first_task_id}:
             return and_(
                 TaskInstance.dag_id == dag_id,
                 TaskInstance.run_id == run_id,
-                TaskInstance.map_index.in_(t.map_index for t in tis),
+                TaskInstance.map_index.in_(map_indices),
                 TaskInstance.task_id == first_task_id,
             )

Review Comment:
   I feel these three fast paths shouldn’t be changed, the new code does not 
_look_ faster to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to