ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364239180
 
 

 ##########
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##########
 @@ -34,11 +34,30 @@ class TriggerRuleDep(BaseTIDep):
     IGNOREABLE = True
     IS_TASK_DEP = True
 
+    @staticmethod
+    @provide_session
+    def _get_states_count_upstream_ti(ti, finished_tasks, session):
+        """
+        This function returns the states of the upstream tis for a specific ti 
in order to determine
+        whether this ti can run in this iteration
+
+        :param ti: the ti that we want to calculate deps for
+        :type ti: airflow.models.TaskInstance
+        :param finished_tasks: all the finished tasks of the dag_run
+        :type finished_tasks: list[airflow.models.TaskInstance]
+        """
+        if not finished_tasks:
+            # this is for the strange feature of running tasks without dag_run
+            finished_tasks = [t for t in 
ti.task.dag.get_task_instances(start_date=ti.execution_date,
+                                                                        
end_date=ti.execution_date)
 
 Review comment:
   We could do the filtering in the DB here?
   
   ```suggestion
                                                                           
end_date=ti.execution_date,
                                                                           
state=(State.finished() + [State.UPSTREAM_FAILED])
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to