dstandish commented on code in PR #33570:
URL: https://github.com/apache/airflow/pull/33570#discussion_r1306476380
##########
airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -94,33 +98,41 @@ def _get_dep_statuses(
session: Session,
dep_context: DepContext,
) -> Iterator[TIDepStatus]:
+ # get all the setup tasks upstream of this task
+ setup_upstream_tasks = list(ti.task.get_upstreams_only_setups())
# Checking that all upstream dependencies have succeeded.
if not ti.task.upstream_task_ids:
yield self._passing_status(reason="The task instance did not have
any upstream tasks.")
return
- if ti.task.trigger_rule == TR.ALWAYS:
+ if ti.task.trigger_rule == TR.ALWAYS and not setup_upstream_tasks:
+ # even with ALWAYS trigger rule, we still need to check setup tasks
yield self._passing_status(reason="The task had a always trigger
rule set.")
return
- yield from self._evaluate_trigger_rule(ti=ti, dep_context=dep_context,
session=session)
+ yield from self._evaluate_trigger_rule(
+ ti=ti, dep_context=dep_context,
setup_upstream_tasks=setup_upstream_tasks, session=session
+ )
def _evaluate_trigger_rule(
self,
*,
ti: TaskInstance,
dep_context: DepContext,
+ setup_upstream_tasks: list[Operator] | None = None,
session: Session,
) -> Iterator[TIDepStatus]:
"""Evaluate whether ``ti``'s trigger rule was met.
:param ti: Task instance to evaluate the trigger rule of.
:param dep_context: The current dependency context.
+ setup_upstream_tasks: The setup tasks upstream of the current task.
Review Comment:
should probably clarify that it includes indirect
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]