ashb commented on a change in pull request #4751: [AIRFLOW-3607] collected
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364240875
##########
File path: airflow/models/dagrun.py
##########
@@ -346,7 +331,38 @@ def update_state(self, session=None):
session.merge(self)
session.commit()
- return self.state
+ return ready_tis
+
+ @provide_session
+ def get_ready_tis(self, session, scheduleable_tasks, finished_tasks):
+ ready_tis = []
+ for st in scheduleable_tasks:
+ if st.are_dependencies_met(
+ dep_context=DepContext(
+ flag_upstream_failed=True,
+ finished_tasks=finished_tasks),
+ session=session):
+ ready_tis.append(st)
+ return ready_tis
+
+ @provide_session
+ def are_runnable_tis(self, session, unfinished_tasks, finished_tasks):
+ # this is an optimization to avoid running on tasks that are not ready
twice
+ not_ready_tis = []
+ # there might be runnable tasks that are up for retry and from some
reason(retry delay, etc) are
+ # not ready yet so we set the flags to count them in
+ for ut in unfinished_tasks:
+ if ut.are_dependencies_met(
+ dep_context=DepContext(
+ flag_upstream_failed=True,
+ ignore_in_retry_period=True,
+ ignore_in_reschedule_period=True,
+ finished_tasks=finished_tasks),
+ session=session):
+ return False, not_ready_tis
Review comment:
Do you think you'd be able to add a test to cover this case (that a single
call to `dr.update_state` updates the state for all these TIs?)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services