ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364238302
##########
File path: airflow/models/dagrun.py
##########
@@ -346,7 +331,38 @@ def update_state(self, session=None):
session.merge(self)
session.commit()
- return self.state
+ return ready_tis
+
+ @provide_session
+ def get_ready_tis(self, session, scheduleable_tasks, finished_tasks):
+ ready_tis = []
+ for st in scheduleable_tasks:
+ if st.are_dependencies_met(
+ dep_context=DepContext(
+ flag_upstream_failed=True,
+ finished_tasks=finished_tasks),
+ session=session):
+ ready_tis.append(st)
+ return ready_tis
+
+ @provide_session
+ def are_runnable_tis(self, session, unfinished_tasks, finished_tasks):
+ # this is an optimization to avoid running on tasks that are not ready
twice
+ not_ready_tis = []
+ # there might be runnable tasks that are up for retry and from some
reason(retry delay, etc) are
+ # not ready yet so we set the flags to count them in
+ for ut in unfinished_tasks:
+ if ut.are_dependencies_met(
+ dep_context=DepContext(
+ flag_upstream_failed=True,
+ ignore_in_retry_period=True,
+ ignore_in_reschedule_period=True,
+ finished_tasks=finished_tasks),
+ session=session):
+ return False, not_ready_tis
Review comment:
This would abort the loop early when the first un-finished task is met,
which I don't think we want to do for two reasons:
1. If the DAG has more "paths" one path might not be ready but other tasks
in the DAG could be.
2. If a dag has a lot of "fan-out" (`base >> [ t1, t2, t3, t4]` etc) then
only one of `tN` would get marked as upstream_fails per loop -- they should all
be marked as failed at the same time really.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services