This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch v3-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ab9c958f615c32f95ba4e579fcf87d29d017f423 Author: Hua Shi <[email protected]> AuthorDate: Wed Mar 25 14:46:53 2026 -0700 Teardown Waiting for All in-scope Tasks to Complete (#64181) * teardown run only when all the tasks between setup and teardown are complete * calcate done in 1 line * not needed task_id from query * remove unnecessary if for rare cases. (cherry picked from commit f7c57937a608587c433c691973606807c8fc93ba) --- .../src/airflow/ti_deps/deps/trigger_rule_dep.py | 57 ++++++++++- .../unit/ti_deps/deps/test_trigger_rule_dep.py | 111 +++++++++++++++++++++ 2 files changed, 166 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py index 5d2b6955d75..80d83db679a 100644 --- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py @@ -619,6 +619,55 @@ class TriggerRuleDep(BaseTIDep): reason=f"No strategy to evaluate trigger rule '{trigger_rule_str}'." ) + def _evaluate_teardown_scope() -> Iterator[TIDepStatus]: + """Ensure all tasks between setup(s) and this teardown have completed.""" + if not task.dag: + return + + setup_task_ids = {t.task_id for t in task.upstream_list if t.is_setup} + + all_upstream_ids = task.get_flat_relative_ids(upstream=True) + indirect_upstream_ids = all_upstream_ids - task.upstream_task_ids + + if not indirect_upstream_ids: + return + + in_scope_ids = set() + for setup_id in setup_task_ids: + setup_obj = task.dag.get_task(setup_id) + in_scope_ids.update(indirect_upstream_ids & setup_obj.get_flat_relative_ids(upstream=False)) + + in_scope_tasks = {tid: task.dag.get_task(tid) for tid in in_scope_ids} + + done = sum( + 1 + for x in dep_context.ensure_finished_tis(ti.get_dagrun(session), session) + if _is_relevant_upstream(upstream=x, relevant_ids=in_scope_ids) + ) + + if not any(t.get_needs_expansion() for t in in_scope_tasks.values()): + expected = len(in_scope_tasks) + else: + expected = ( + session.scalar( + select(func.count(TaskInstance.task_id)) + .where(TaskInstance.dag_id == ti.dag_id, TaskInstance.run_id == ti.run_id) + .where(or_(*_iter_upstream_conditions(relevant_tasks=in_scope_tasks))) + ) + or 0 + ) + + if done < expected: + trigger_rule_str = getattr(task.trigger_rule, "value", task.trigger_rule) + yield self._failing_status( + reason=( + f"Task's trigger rule '{trigger_rule_str}' requires all tasks between " + f"setup and teardown to have completed, but found {expected - done} " + f"in-scope task(s) not done. " + f"in_scope_task_ids={in_scope_ids}" + ) + ) + if not task.is_teardown: # a teardown cannot have any indirect setups if relevant_setups := {t.task_id: t for t in task.get_upstreams_only_setups()}: @@ -627,5 +676,9 @@ class TriggerRuleDep(BaseTIDep): if not status.passed and changed: # no need to evaluate trigger rule; we've already marked as skipped or failed return - - yield from _evaluate_direct_relatives() + yield from _evaluate_direct_relatives() + else: + statuses = list(_evaluate_direct_relatives()) + yield from statuses + if not statuses: + yield from _evaluate_teardown_scope() diff --git a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py index 31b240e4429..d43e3ef25b4 100644 --- a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py +++ b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py @@ -858,6 +858,117 @@ class TestTriggerRuleDep: expected_ti_state=exp_state if exp_state and flag_upstream_failed else None, ) + @pytest.mark.parametrize("flag_upstream_failed", [True, False]) + def test_teardown_waits_for_in_scope_tasks(self, session, dag_maker, flag_upstream_failed): + """ + Teardown should not run until all tasks between setup and teardown are done. + + Regression test for https://github.com/apache/airflow/issues/29332 + """ + with dag_maker(session=session): + setup = EmptyOperator(task_id="setup").as_setup() + t1 = EmptyOperator(task_id="t1") + t2 = EmptyOperator(task_id="t2") + t3 = EmptyOperator(task_id="t3") + teardown_task = EmptyOperator(task_id="teardown").as_teardown(setups=setup) + setup >> t1 >> t2 >> t3 >> teardown_task + + dr = dag_maker.create_dagrun() + tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)} + + for task_id in ("setup", "t2", "t3"): + tis[task_id].state = SUCCESS + session.merge(tis[task_id]) + session.flush() + + teardown_ti = tis["teardown"] + teardown_ti.task = dag_maker.dag.get_task("teardown") + assert teardown_ti.state is None + + dep_statuses = tuple( + TriggerRuleDep()._evaluate_trigger_rule( + ti=teardown_ti, + dep_context=DepContext(flag_upstream_failed=flag_upstream_failed), + session=session, + ) + ) + assert len(dep_statuses) == 1 + assert not dep_statuses[0].passed + assert "in-scope" in dep_statuses[0].reason + + @pytest.mark.parametrize("flag_upstream_failed", [True, False]) + def test_teardown_runs_when_all_in_scope_tasks_done(self, session, dag_maker, flag_upstream_failed): + """ + Teardown should run when all tasks between setup and teardown are done. + """ + with dag_maker(session=session): + setup = EmptyOperator(task_id="setup").as_setup() + t1 = EmptyOperator(task_id="t1") + t2 = EmptyOperator(task_id="t2") + t3 = EmptyOperator(task_id="t3") + teardown_task = EmptyOperator(task_id="teardown").as_teardown(setups=setup) + setup >> t1 >> t2 >> t3 >> teardown_task + + dr = dag_maker.create_dagrun() + tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)} + + for task_id in ("setup", "t1", "t2", "t3"): + tis[task_id].state = SUCCESS + session.merge(tis[task_id]) + session.flush() + + teardown_ti = tis["teardown"] + teardown_ti.task = dag_maker.dag.get_task("teardown") + assert teardown_ti.state is None + + dep_statuses = tuple( + TriggerRuleDep()._evaluate_trigger_rule( + ti=teardown_ti, + dep_context=DepContext(flag_upstream_failed=flag_upstream_failed), + session=session, + ) + ) + assert not dep_statuses + + @pytest.mark.parametrize("flag_upstream_failed", [True, False]) + def test_teardown_waits_for_multiple_cleared_in_scope_tasks( + self, session, dag_maker, flag_upstream_failed + ): + """ + Teardown should wait when multiple in-scope tasks are not done. + """ + with dag_maker(session=session): + setup = EmptyOperator(task_id="setup").as_setup() + t1 = EmptyOperator(task_id="t1") + t2 = EmptyOperator(task_id="t2") + t3 = EmptyOperator(task_id="t3") + teardown_task = EmptyOperator(task_id="teardown").as_teardown(setups=setup) + setup >> t1 >> t2 >> t3 >> teardown_task + + dr = dag_maker.create_dagrun() + tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)} + + tis["setup"].state = SUCCESS + tis["t3"].state = SUCCESS + session.merge(tis["setup"]) + session.merge(tis["t3"]) + session.flush() + + teardown_ti = tis["teardown"] + teardown_ti.task = dag_maker.dag.get_task("teardown") + assert teardown_ti.state is None + + dep_statuses = tuple( + TriggerRuleDep()._evaluate_trigger_rule( + ti=teardown_ti, + dep_context=DepContext(flag_upstream_failed=flag_upstream_failed), + session=session, + ) + ) + assert len(dep_statuses) == 1 + assert not dep_statuses[0].passed + assert "2 in-scope" in dep_statuses[0].reason + @pytest.mark.parametrize(("flag_upstream_failed", "expected_ti_state"), [(True, SKIPPED), (False, None)]) def test_all_skipped_tr_failure( self, session, get_task_instance, flag_upstream_failed, expected_ti_state
