This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a2eaca8977 Allow setup without teardown (#32679)
a2eaca8977 is described below

commit a2eaca897734ac43d5263832b319c99b3510b7b5
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Jul 19 11:54:58 2023 -0700

    Allow setup without teardown (#32679)
---
 airflow/models/abstractoperator.py |  7 +++-
 airflow/models/dag.py              |  4 --
 tests/models/test_dag.py           | 86 +++++++++++---------------------------
 3 files changed, 29 insertions(+), 68 deletions(-)

diff --git a/airflow/models/abstractoperator.py 
b/airflow/models/abstractoperator.py
index 9a8f88ce7d..0c6d89fff3 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -302,7 +302,8 @@ class AbstractOperator(Templater, DAGNode):
         This method is meant to be used when we are clearing the task 
(non-upstream) and we need
         to add in the *relevant* setups and their teardowns.
 
-        Relevant in this case means, the setup has a teardown that is 
downstream of ``self``.
+        Relevant in this case means, the setup has a teardown that is 
downstream of ``self``,
+        or the setup has no teardowns.
         """
         downstream_teardown_ids = {
             x.task_id for x in self.get_flat_relatives(upstream=False) if 
x.is_teardown
@@ -310,7 +311,9 @@ class AbstractOperator(Templater, DAGNode):
         for task in self.get_flat_relatives(upstream=True):
             if not task.is_setup:
                 continue
-            if not 
task.downstream_task_ids.isdisjoint(downstream_teardown_ids):
+            has_no_teardowns = not any(True for x in task.downstream_list if 
x.is_teardown)
+            # if task has no teardowns or has teardowns downstream of self
+            if has_no_teardowns or 
task.downstream_task_ids.intersection(downstream_teardown_ids):
                 yield task
                 for t in task.downstream_list:
                     if t.is_teardown and not t == self:
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 391be9e582..1d380274f6 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -714,10 +714,6 @@ class DAG(LoggingMixin):
         :meta private:
         """
         for task in self.tasks:
-            if task.is_setup and not any(x.is_teardown for x in 
task.downstream_list):
-                raise AirflowDagInconsistent(
-                    f"Dag has setup without teardown: dag='{self.dag_id}', 
task='{task.task_id}'"
-                )
             if task.is_teardown and all(x.is_setup for x in 
task.upstream_list):
                 raise AirflowDagInconsistent(
                     f"Dag has teardown task without an upstream work task: 
dag='{self.dag_id}',"
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 14527fd7fa..494c7c7c66 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3623,39 +3623,46 @@ class TestTaskClearingSetupTeardownBehavior:
 
     def test_get_flat_relative_ids_with_setup(self):
         with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
-            s1, w1, w2, w3, t1 = self.make_tasks(dag, "s1, w1, w2, w3, t1")
+            s1, w1, w2, w3, w4, t1 = self.make_tasks(dag, "s1, w1, w2, w3, w4, 
t1")
 
         s1 >> w1 >> w2 >> w3
 
-        # there is no teardown downstream of w1, so we assume w1 does not need 
s1
-        assert set(w1.get_upstreams_only_setups_and_teardowns()) == set()
+        # w1 is downstream of s1, and s1 has no teardown, so clearing w1 
clears s1
+        assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1}
         # same with w2 and w3
-        assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()
-        assert set(w3.get_upstreams_only_setups_and_teardowns()) == set()
-        assert self.cleared_downstream(w2) == {w2, w3}
+        assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1}
+        assert set(w3.get_upstreams_only_setups_and_teardowns()) == {s1}
+        # so if we clear w2, we should also get s1, and w3, but not w1
+        assert self.cleared_downstream(w2) == {s1, w2, w3}
 
         w3 >> t1
 
         # now, w2 has a downstream teardown, but it's not connected directly 
to s1
-        # (this is how we signal "this is the teardown for this setup")
-        # so still, we don't regard s1 as a setup for w2
-        assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()
-        assert self.cleared_downstream(w2) == {w2, w3, t1}
+        assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1}
+        # so if we clear downstream then s1 will be cleared, and t1 will be 
cleared but only by virtue of
+        # being downstream of w2 -- not as a result of being the teardown for 
s1, which it ain't
+        assert self.cleared_downstream(w2) == {s1, w2, w3, t1}
+        # and, another consequence of not linking s1 and t1 is that when we 
clear upstream, note that
+        # t1 doesn't get cleared -- cus it's not upstream and it's not linked 
to s1
+        assert self.cleared_upstream(w2) == {s1, w1, w2}
+        # note also that if we add a 4th work task after t1, it will still be 
"in scope" for s1
+        t1 >> w4
+        assert self.cleared_downstream(w4) == {s1, w4}
 
         s1 >> t1
 
-        # now, we know that t1 is the teardown for s1, and it's downstream of
-        # w2, so we can infer that w2 requires it, so now when we clear w2,
-        # we will get s1 (because it's a setup for w2) and t1 (because
-        # it is a teardown for s1)
+        # now, we know that t1 is the teardown for s1, so now we know that s1 
will be "torn down"
+        # by the time w4 runs, so we now know that w4 no longer requires s1, 
so when we clear w4,
+        # s1 will not also be cleared
+        self.cleared_downstream(w4) == {w4}
         assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
-        assert self.cleared_downstream(w1) == {s1, w1, w2, w3, t1}
+        assert self.cleared_downstream(w1) == {s1, w1, w2, w3, t1, w4}
         assert self.cleared_upstream(w1) == {s1, w1, t1}
         assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
         assert set(w2.get_upstreams_follow_setups()) == {s1, w1, t1}
-        assert self.cleared_downstream(w2) == {s1, w2, w3, t1}
+        assert self.cleared_downstream(w2) == {s1, w2, w3, t1, w4}
         assert self.cleared_upstream(w2) == {s1, w1, w2, t1}
-        assert self.cleared_downstream(w3) == {s1, w3, t1}
+        assert self.cleared_downstream(w3) == {s1, w3, t1, w4}
         assert self.cleared_upstream(w3) == {s1, w1, w2, w3, t1}
 
     def test_get_flat_relative_ids_with_setup_nested_ctx_mgr(self):
@@ -3695,32 +3702,6 @@ class TestTaskClearingSetupTeardownBehavior:
         # t1 not included because t1 is not downstream
         assert self.cleared_downstream(w3) == {s2, w3, t2}
 
-    def test_setup_without_teardown(self):
-        """A setup needs a teardown to define its scope."""
-        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
-            s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
-        # s1 has no teardown: fail
-        with pytest.raises(AirflowDagInconsistent):
-            dag.validate_setup_teardown()
-
-        s1 >> w1
-        # w1 depends on s1 but not as a "setup" per se, since s1 doesn't have 
a teardown to define
-        # its scope
-        with pytest.raises(AirflowDagInconsistent):
-            dag.validate_setup_teardown()
-
-        w1 >> t1
-        # now t1 is technically downstream of s1, but we still must wire it up 
explicitly
-        # to define the setup/teardown relationship
-        with pytest.raises(AirflowDagInconsistent):
-            dag.validate_setup_teardown()
-
-        s1 >> t1
-        # now, s1 and t1 are linked as setups and teardowns
-        # anything upstream of t1 and downstream of s1 is in the scope for s1
-        # so now this passes validation
-        dag.validate_setup_teardown()
-
     def test_get_flat_relative_ids_follows_teardowns(self):
         with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
             s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
@@ -3918,25 +3899,6 @@ class TestTaskClearingSetupTeardownBehavior:
 
     def test_validate_setup_teardown_dag(self, dag_maker):
         """Test some invalid setups and teardowns in a dag"""
-        with dag_maker("test_dag") as dag:
-            s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
-
-            with s1:
-                w1 >> t1
-                w2 >> t1
-        with pytest.raises(
-            AirflowDagInconsistent, match="Dag has setup without teardown: 
dag='test_dag', task='s1'"
-        ):
-            dag.validate()
-
-        with dag_maker("test_dag") as dag:
-            s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
-            s1 >> w1 >> w2 >> t1
-        with pytest.raises(
-            AirflowDagInconsistent, match="Dag has setup without teardown: 
dag='test_dag', task='s1'"
-        ):
-            dag.validate()
-
         with dag_maker("test_dag") as dag:
             s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
             w1 >> w2

Reply via email to