uranusjr commented on code in PR #41611:
URL: https://github.com/apache/airflow/pull/41611#discussion_r1726197709


##########
airflow/models/taskinstance.py:
##########
@@ -4036,6 +4036,12 @@ def clear_db_references(self, session: Session):
                 )
             )
 
+    def get_upstream_task_ids_by_state(self, state: list[TaskInstanceState], 
session: Session) -> set[str]:
+        """Get direct upstream tasks ids by state."""
+        task_instances = self.dag_run.get_task_instances(state=state, 
session=session)
+        dag_task_instances = set(task_instance.task_id for task_instance in 
task_instances)
+        return dag_task_instances & self.task.upstream_task_ids  # type: 
ignore[union-attr]

Review Comment:
   It’s probbly better to do this the other way around? i.e. get the upstream 
task IDs first, and then filter them by state. The performance difference is 
likely significant if the DAG is pretty large (and `get_task_instances` may 
need to fetch a lot more rows than needed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to