This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f7c57937a60 Teardown Waiting for All in-scope Tasks to Complete
(#64181)
f7c57937a60 is described below
commit f7c57937a608587c433c691973606807c8fc93ba
Author: Hua Shi <[email protected]>
AuthorDate: Wed Mar 25 14:46:53 2026 -0700
Teardown Waiting for All in-scope Tasks to Complete (#64181)
* teardown run only when all the tasks between setup and teardown are
complete
* calcate done in 1 line
* not needed task_id from query
* remove unnecessary if for rare cases.
---
.../src/airflow/ti_deps/deps/trigger_rule_dep.py | 57 ++++++++++-
.../unit/ti_deps/deps/test_trigger_rule_dep.py | 111 +++++++++++++++++++++
2 files changed, 166 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index 5d2b6955d75..80d83db679a 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -619,6 +619,55 @@ class TriggerRuleDep(BaseTIDep):
reason=f"No strategy to evaluate trigger rule
'{trigger_rule_str}'."
)
+ def _evaluate_teardown_scope() -> Iterator[TIDepStatus]:
+ """Ensure all tasks between setup(s) and this teardown have
completed."""
+ if not task.dag:
+ return
+
+ setup_task_ids = {t.task_id for t in task.upstream_list if
t.is_setup}
+
+ all_upstream_ids = task.get_flat_relative_ids(upstream=True)
+ indirect_upstream_ids = all_upstream_ids - task.upstream_task_ids
+
+ if not indirect_upstream_ids:
+ return
+
+ in_scope_ids = set()
+ for setup_id in setup_task_ids:
+ setup_obj = task.dag.get_task(setup_id)
+ in_scope_ids.update(indirect_upstream_ids &
setup_obj.get_flat_relative_ids(upstream=False))
+
+ in_scope_tasks = {tid: task.dag.get_task(tid) for tid in
in_scope_ids}
+
+ done = sum(
+ 1
+ for x in
dep_context.ensure_finished_tis(ti.get_dagrun(session), session)
+ if _is_relevant_upstream(upstream=x, relevant_ids=in_scope_ids)
+ )
+
+ if not any(t.get_needs_expansion() for t in
in_scope_tasks.values()):
+ expected = len(in_scope_tasks)
+ else:
+ expected = (
+ session.scalar(
+ select(func.count(TaskInstance.task_id))
+ .where(TaskInstance.dag_id == ti.dag_id,
TaskInstance.run_id == ti.run_id)
+
.where(or_(*_iter_upstream_conditions(relevant_tasks=in_scope_tasks)))
+ )
+ or 0
+ )
+
+ if done < expected:
+ trigger_rule_str = getattr(task.trigger_rule, "value",
task.trigger_rule)
+ yield self._failing_status(
+ reason=(
+ f"Task's trigger rule '{trigger_rule_str}' requires
all tasks between "
+ f"setup and teardown to have completed, but found
{expected - done} "
+ f"in-scope task(s) not done. "
+ f"in_scope_task_ids={in_scope_ids}"
+ )
+ )
+
if not task.is_teardown:
# a teardown cannot have any indirect setups
if relevant_setups := {t.task_id: t for t in
task.get_upstreams_only_setups()}:
@@ -627,5 +676,9 @@ class TriggerRuleDep(BaseTIDep):
if not status.passed and changed:
# no need to evaluate trigger rule; we've already
marked as skipped or failed
return
-
- yield from _evaluate_direct_relatives()
+ yield from _evaluate_direct_relatives()
+ else:
+ statuses = list(_evaluate_direct_relatives())
+ yield from statuses
+ if not statuses:
+ yield from _evaluate_teardown_scope()
diff --git a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
index 31b240e4429..d43e3ef25b4 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
@@ -858,6 +858,117 @@ class TestTriggerRuleDep:
expected_ti_state=exp_state if exp_state and flag_upstream_failed
else None,
)
+ @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+ def test_teardown_waits_for_in_scope_tasks(self, session, dag_maker,
flag_upstream_failed):
+ """
+ Teardown should not run until all tasks between setup and teardown are
done.
+
+ Regression test for https://github.com/apache/airflow/issues/29332
+ """
+ with dag_maker(session=session):
+ setup = EmptyOperator(task_id="setup").as_setup()
+ t1 = EmptyOperator(task_id="t1")
+ t2 = EmptyOperator(task_id="t2")
+ t3 = EmptyOperator(task_id="t3")
+ teardown_task =
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+ setup >> t1 >> t2 >> t3 >> teardown_task
+
+ dr = dag_maker.create_dagrun()
+ tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+ for task_id in ("setup", "t2", "t3"):
+ tis[task_id].state = SUCCESS
+ session.merge(tis[task_id])
+ session.flush()
+
+ teardown_ti = tis["teardown"]
+ teardown_ti.task = dag_maker.dag.get_task("teardown")
+ assert teardown_ti.state is None
+
+ dep_statuses = tuple(
+ TriggerRuleDep()._evaluate_trigger_rule(
+ ti=teardown_ti,
+
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+ session=session,
+ )
+ )
+ assert len(dep_statuses) == 1
+ assert not dep_statuses[0].passed
+ assert "in-scope" in dep_statuses[0].reason
+
+ @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+ def test_teardown_runs_when_all_in_scope_tasks_done(self, session,
dag_maker, flag_upstream_failed):
+ """
+ Teardown should run when all tasks between setup and teardown are done.
+ """
+ with dag_maker(session=session):
+ setup = EmptyOperator(task_id="setup").as_setup()
+ t1 = EmptyOperator(task_id="t1")
+ t2 = EmptyOperator(task_id="t2")
+ t3 = EmptyOperator(task_id="t3")
+ teardown_task =
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+ setup >> t1 >> t2 >> t3 >> teardown_task
+
+ dr = dag_maker.create_dagrun()
+ tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+ for task_id in ("setup", "t1", "t2", "t3"):
+ tis[task_id].state = SUCCESS
+ session.merge(tis[task_id])
+ session.flush()
+
+ teardown_ti = tis["teardown"]
+ teardown_ti.task = dag_maker.dag.get_task("teardown")
+ assert teardown_ti.state is None
+
+ dep_statuses = tuple(
+ TriggerRuleDep()._evaluate_trigger_rule(
+ ti=teardown_ti,
+
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+ session=session,
+ )
+ )
+ assert not dep_statuses
+
+ @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+ def test_teardown_waits_for_multiple_cleared_in_scope_tasks(
+ self, session, dag_maker, flag_upstream_failed
+ ):
+ """
+ Teardown should wait when multiple in-scope tasks are not done.
+ """
+ with dag_maker(session=session):
+ setup = EmptyOperator(task_id="setup").as_setup()
+ t1 = EmptyOperator(task_id="t1")
+ t2 = EmptyOperator(task_id="t2")
+ t3 = EmptyOperator(task_id="t3")
+ teardown_task =
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+ setup >> t1 >> t2 >> t3 >> teardown_task
+
+ dr = dag_maker.create_dagrun()
+ tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+ tis["setup"].state = SUCCESS
+ tis["t3"].state = SUCCESS
+ session.merge(tis["setup"])
+ session.merge(tis["t3"])
+ session.flush()
+
+ teardown_ti = tis["teardown"]
+ teardown_ti.task = dag_maker.dag.get_task("teardown")
+ assert teardown_ti.state is None
+
+ dep_statuses = tuple(
+ TriggerRuleDep()._evaluate_trigger_rule(
+ ti=teardown_ti,
+
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+ session=session,
+ )
+ )
+ assert len(dep_statuses) == 1
+ assert not dep_statuses[0].passed
+ assert "2 in-scope" in dep_statuses[0].reason
+
@pytest.mark.parametrize(("flag_upstream_failed", "expected_ti_state"),
[(True, SKIPPED), (False, None)])
def test_all_skipped_tr_failure(
self, session, get_task_instance, flag_upstream_failed,
expected_ti_state