ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364238302
 
 

 ##########
 File path: airflow/models/dagrun.py
 ##########
 @@ -346,7 +331,38 @@ def update_state(self, session=None):
         session.merge(self)
         session.commit()
 
-        return self.state
+        return ready_tis
+
+    @provide_session
+    def get_ready_tis(self, session, scheduleable_tasks, finished_tasks):
+        ready_tis = []
+        for st in scheduleable_tasks:
+            if st.are_dependencies_met(
+                dep_context=DepContext(
+                    flag_upstream_failed=True,
+                    finished_tasks=finished_tasks),
+                    session=session):
+                ready_tis.append(st)
+        return ready_tis
+
+    @provide_session
+    def are_runnable_tis(self, session, unfinished_tasks, finished_tasks):
+        # this is an optimization to avoid running on tasks that are not ready 
twice
+        not_ready_tis = []
+        # there might be runnable tasks that are up for retry and from some 
reason(retry delay, etc) are
+        # not ready yet so we set the flags to count them in
+        for ut in unfinished_tasks:
+            if ut.are_dependencies_met(
+                dep_context=DepContext(
+                    flag_upstream_failed=True,
+                    ignore_in_retry_period=True,
+                    ignore_in_reschedule_period=True,
+                    finished_tasks=finished_tasks),
+                    session=session):
+                return False, not_ready_tis
 
 Review comment:
   This would abort the loop early when the first un-finished task is met, 
which I don't think we want to do for two reasons:
   
   1. If the DAG has more "paths" one path might not be ready but other tasks 
in the DAG could be.
   2. If a dag has a lot of "fan-out" (`base >> [ t1, t2, t3, t4]` etc) then 
only one of `tN` would get marked as upstream_fails per loop -- they should all 
be marked as failed at the same time really.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to