ashb commented on a change in pull request #11147:
URL: https://github.com/apache/airflow/pull/11147#discussion_r495139273



##########
File path: airflow/models/taskinstance.py
##########
@@ -1811,20 +1811,34 @@ def filter_for_tis(
         """Returns SQLAlchemy filter to query selected task instances"""
         if not tis:
             return None
-        if all(isinstance(t, TaskInstanceKey) for t in tis):
-            filter_for_tis = ([and_(TaskInstance.dag_id == tik.dag_id,
-                                    TaskInstance.task_id == tik.task_id,
-                                    TaskInstance.execution_date == 
tik.execution_date)
-                               for tik in tis])
-            return or_(*filter_for_tis)
-        if all(isinstance(t, TaskInstance) for t in tis):
-            filter_for_tis = ([and_(TaskInstance.dag_id == ti.dag_id,
-                                    TaskInstance.task_id == ti.task_id,
-                                    TaskInstance.execution_date == 
ti.execution_date)
-                               for ti in tis])
-            return or_(*filter_for_tis)
-
-        raise TypeError("All elements must have the same type: `TaskInstance` 
or `TaskInstanceKey`.")
+
+        # DictKeys type, (what we often pass here from the scheduler) is not 
directly indexable :(
+        first = list(tis)[0]
+
+        dag_id = first.dag_id
+        execution_date = first.execution_date
+        first_task_id = first.task_id
+        # Common path optimisations: when all TIs are for the same dag_id and 
execution_date, or same dag_id
+        # and task_id -- this can be over 150x for huge numbers of TIs (20k+)
+        if all(t.dag_id == dag_id and t.execution_date == execution_date for t 
in tis):
+            return and_(
+                TaskInstance.dag_id == dag_id,
+                TaskInstance.execution_date == execution_date,
+                TaskInstance.task_id.in_(t.task_id for t in tis),
+            )
+        if all(t.dag_id == dag_id and t.task_id == first_task_id for t in tis):

Review comment:
       I'm not sure how common this case is, and if it's situational/only in 
our tests, but there was a case from tests when scheduling the same task on 10 
dag runs -- it doesn't seem expensive to check this (as soon as the first 
different task id is found it'll stop) -- so excluding "degenerate" case where 
999 tasks have the same task_id and task 1000 has a different one this'll be 
fine. And even then this is all in memory so should be "fast enough".
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to