This is an automated email from the ASF dual-hosted git repository.

rahulvats pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ab9c958f615c32f95ba4e579fcf87d29d017f423
Author: Hua Shi <[email protected]>
AuthorDate: Wed Mar 25 14:46:53 2026 -0700

    Teardown Waiting for All in-scope Tasks to Complete (#64181)
    
    * teardown run only when all the tasks between setup and teardown are 
complete
    
    * calcate done in 1 line
    
    * not needed task_id from query
    
    * remove unnecessary if for rare cases.
    
    (cherry picked from commit f7c57937a608587c433c691973606807c8fc93ba)
---
 .../src/airflow/ti_deps/deps/trigger_rule_dep.py   |  57 ++++++++++-
 .../unit/ti_deps/deps/test_trigger_rule_dep.py     | 111 +++++++++++++++++++++
 2 files changed, 166 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
index 5d2b6955d75..80d83db679a 100644
--- a/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -619,6 +619,55 @@ class TriggerRuleDep(BaseTIDep):
                     reason=f"No strategy to evaluate trigger rule 
'{trigger_rule_str}'."
                 )
 
+        def _evaluate_teardown_scope() -> Iterator[TIDepStatus]:
+            """Ensure all tasks between setup(s) and this teardown have 
completed."""
+            if not task.dag:
+                return
+
+            setup_task_ids = {t.task_id for t in task.upstream_list if 
t.is_setup}
+
+            all_upstream_ids = task.get_flat_relative_ids(upstream=True)
+            indirect_upstream_ids = all_upstream_ids - task.upstream_task_ids
+
+            if not indirect_upstream_ids:
+                return
+
+            in_scope_ids = set()
+            for setup_id in setup_task_ids:
+                setup_obj = task.dag.get_task(setup_id)
+                in_scope_ids.update(indirect_upstream_ids & 
setup_obj.get_flat_relative_ids(upstream=False))
+
+            in_scope_tasks = {tid: task.dag.get_task(tid) for tid in 
in_scope_ids}
+
+            done = sum(
+                1
+                for x in 
dep_context.ensure_finished_tis(ti.get_dagrun(session), session)
+                if _is_relevant_upstream(upstream=x, relevant_ids=in_scope_ids)
+            )
+
+            if not any(t.get_needs_expansion() for t in 
in_scope_tasks.values()):
+                expected = len(in_scope_tasks)
+            else:
+                expected = (
+                    session.scalar(
+                        select(func.count(TaskInstance.task_id))
+                        .where(TaskInstance.dag_id == ti.dag_id, 
TaskInstance.run_id == ti.run_id)
+                        
.where(or_(*_iter_upstream_conditions(relevant_tasks=in_scope_tasks)))
+                    )
+                    or 0
+                )
+
+            if done < expected:
+                trigger_rule_str = getattr(task.trigger_rule, "value", 
task.trigger_rule)
+                yield self._failing_status(
+                    reason=(
+                        f"Task's trigger rule '{trigger_rule_str}' requires 
all tasks between "
+                        f"setup and teardown to have completed, but found 
{expected - done} "
+                        f"in-scope task(s) not done. "
+                        f"in_scope_task_ids={in_scope_ids}"
+                    )
+                )
+
         if not task.is_teardown:
             # a teardown cannot have any indirect setups
             if relevant_setups := {t.task_id: t for t in 
task.get_upstreams_only_setups()}:
@@ -627,5 +676,9 @@ class TriggerRuleDep(BaseTIDep):
                     if not status.passed and changed:
                         # no need to evaluate trigger rule; we've already 
marked as skipped or failed
                         return
-
-        yield from _evaluate_direct_relatives()
+            yield from _evaluate_direct_relatives()
+        else:
+            statuses = list(_evaluate_direct_relatives())
+            yield from statuses
+            if not statuses:
+                yield from _evaluate_teardown_scope()
diff --git a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py 
b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
index 31b240e4429..d43e3ef25b4 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_trigger_rule_dep.py
@@ -858,6 +858,117 @@ class TestTriggerRuleDep:
             expected_ti_state=exp_state if exp_state and flag_upstream_failed 
else None,
         )
 
+    @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+    def test_teardown_waits_for_in_scope_tasks(self, session, dag_maker, 
flag_upstream_failed):
+        """
+        Teardown should not run until all tasks between setup and teardown are 
done.
+
+        Regression test for https://github.com/apache/airflow/issues/29332
+        """
+        with dag_maker(session=session):
+            setup = EmptyOperator(task_id="setup").as_setup()
+            t1 = EmptyOperator(task_id="t1")
+            t2 = EmptyOperator(task_id="t2")
+            t3 = EmptyOperator(task_id="t3")
+            teardown_task = 
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+            setup >> t1 >> t2 >> t3 >> teardown_task
+
+        dr = dag_maker.create_dagrun()
+        tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+        for task_id in ("setup", "t2", "t3"):
+            tis[task_id].state = SUCCESS
+            session.merge(tis[task_id])
+        session.flush()
+
+        teardown_ti = tis["teardown"]
+        teardown_ti.task = dag_maker.dag.get_task("teardown")
+        assert teardown_ti.state is None
+
+        dep_statuses = tuple(
+            TriggerRuleDep()._evaluate_trigger_rule(
+                ti=teardown_ti,
+                
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+                session=session,
+            )
+        )
+        assert len(dep_statuses) == 1
+        assert not dep_statuses[0].passed
+        assert "in-scope" in dep_statuses[0].reason
+
+    @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+    def test_teardown_runs_when_all_in_scope_tasks_done(self, session, 
dag_maker, flag_upstream_failed):
+        """
+        Teardown should run when all tasks between setup and teardown are done.
+        """
+        with dag_maker(session=session):
+            setup = EmptyOperator(task_id="setup").as_setup()
+            t1 = EmptyOperator(task_id="t1")
+            t2 = EmptyOperator(task_id="t2")
+            t3 = EmptyOperator(task_id="t3")
+            teardown_task = 
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+            setup >> t1 >> t2 >> t3 >> teardown_task
+
+        dr = dag_maker.create_dagrun()
+        tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+        for task_id in ("setup", "t1", "t2", "t3"):
+            tis[task_id].state = SUCCESS
+            session.merge(tis[task_id])
+        session.flush()
+
+        teardown_ti = tis["teardown"]
+        teardown_ti.task = dag_maker.dag.get_task("teardown")
+        assert teardown_ti.state is None
+
+        dep_statuses = tuple(
+            TriggerRuleDep()._evaluate_trigger_rule(
+                ti=teardown_ti,
+                
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+                session=session,
+            )
+        )
+        assert not dep_statuses
+
+    @pytest.mark.parametrize("flag_upstream_failed", [True, False])
+    def test_teardown_waits_for_multiple_cleared_in_scope_tasks(
+        self, session, dag_maker, flag_upstream_failed
+    ):
+        """
+        Teardown should wait when multiple in-scope tasks are not done.
+        """
+        with dag_maker(session=session):
+            setup = EmptyOperator(task_id="setup").as_setup()
+            t1 = EmptyOperator(task_id="t1")
+            t2 = EmptyOperator(task_id="t2")
+            t3 = EmptyOperator(task_id="t3")
+            teardown_task = 
EmptyOperator(task_id="teardown").as_teardown(setups=setup)
+            setup >> t1 >> t2 >> t3 >> teardown_task
+
+        dr = dag_maker.create_dagrun()
+        tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
+
+        tis["setup"].state = SUCCESS
+        tis["t3"].state = SUCCESS
+        session.merge(tis["setup"])
+        session.merge(tis["t3"])
+        session.flush()
+
+        teardown_ti = tis["teardown"]
+        teardown_ti.task = dag_maker.dag.get_task("teardown")
+        assert teardown_ti.state is None
+
+        dep_statuses = tuple(
+            TriggerRuleDep()._evaluate_trigger_rule(
+                ti=teardown_ti,
+                
dep_context=DepContext(flag_upstream_failed=flag_upstream_failed),
+                session=session,
+            )
+        )
+        assert len(dep_statuses) == 1
+        assert not dep_statuses[0].passed
+        assert "2 in-scope" in dep_statuses[0].reason
+
     @pytest.mark.parametrize(("flag_upstream_failed", "expected_ti_state"), 
[(True, SKIPPED), (False, None)])
     def test_all_skipped_tr_failure(
         self, session, get_task_instance, flag_upstream_failed, 
expected_ti_state

Reply via email to