This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 566bc1b68b Fix tests and add logic to handle clearing setup directly 
(#32430)
566bc1b68b is described below

commit 566bc1b68b4e1643761b4e8518e5e556b8e6e82c
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Jul 7 15:52:14 2023 -0700

    Fix tests and add logic to handle clearing setup directly (#32430)
---
 airflow/models/dag.py    |  2 ++
 tests/models/test_dag.py | 77 +++++++++++++++++++++++++++++++++++++++++++-----
 2 files changed, 72 insertions(+), 7 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 1ec78b64c3..234db54b15 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2373,6 +2373,8 @@ class DAG(LoggingMixin):
                 also_include.extend(t.get_upstreams_follow_setups())
             else:
                 
also_include.extend(t.get_upstreams_only_setups_and_teardowns())
+            if t.is_setup and not include_downstream:
+                also_include.extend(x for x in t.downstream_list if 
x.is_teardown)
 
         direct_upstreams: list[Operator] = []
         if include_direct_upstream:
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index bd460e6cc7..e4a3290a5d 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3590,12 +3590,23 @@ class TestTaskClearingSetupTeardownBehavior:
         upstream = True
         return set(
             task.dag.partial_subset(
-                task_ids_or_regex=[task.task_id],
+                task_ids_or_regex=task.task_id,
                 include_downstream=not upstream,
                 include_upstream=upstream,
             ).tasks
         )
 
+    @staticmethod
+    def cleared_neither(task):
+        """Helper to return tasks that would be cleared if **upstream** 
selected."""
+        return set(
+            task.dag.partial_subset(
+                task_ids_or_regex=[task.task_id],
+                include_downstream=False,
+                include_upstream=False,
+            ).tasks
+        )
+
     def test_get_flat_relative_ids_with_setup(self):
         with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
             s1, w1, w2, w3, t1 = self.make_tasks(dag, "s1, w1, w2, w3, t1")
@@ -3823,18 +3834,70 @@ class TestTaskClearingSetupTeardownBehavior:
         """
         with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
             s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
-            s1 >> w1 >> t1
+            s1 >> w1 >> t1.as_teardown(setups=s1)
             s1 >> w2
-            self.cleared_upstream(w2) == {s1, w2, t1}
+            # w2 is downstream of s1, so when clearing upstream, it should 
clear s1 (since it
+            # is upstream of w2) and t1 since it's the teardown for s1 even 
though not downstream of w1
+            assert self.cleared_upstream(w2) == {s1, w2, t1}
 
-    def clearing_teardown_no_clear_setup(self):
+    def test_clearing_teardown_no_clear_setup(self):
         with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
             s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
             s1 >> t1
             # clearing t1 does not clear s1
-            self.cleared_downstream(t1) == {t1}
+            assert self.cleared_downstream(t1) == {t1}
             s1 >> w1 >> t1
             # that isn't changed with the introduction of w1
-            self.cleared_downstream(t1) == {t1}
+            assert self.cleared_downstream(t1) == {t1}
             # though, of course, clearing w1 clears them all
-            self.cleared_downstream(w1) == {s1, w1, t1}
+            assert self.cleared_downstream(w1) == {s1, w1, t1}
+
+    def test_clearing_setup_clears_teardown(self):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
+            s1 >> t1
+            s1 >> w1 >> t1
+            # clearing w1 clears all always
+            assert self.cleared_upstream(w1) == {s1, w1, t1}
+            assert self.cleared_downstream(w1) == {s1, w1, t1}
+            assert self.cleared_neither(w1) == {s1, w1, t1}
+            # clearing s1 clears t1 always
+            assert self.cleared_upstream(s1) == {s1, t1}
+            assert self.cleared_downstream(s1) == {s1, w1, t1}
+            assert self.cleared_neither(s1) == {s1, t1}
+
+    @pytest.mark.parametrize(
+        "upstream, downstream, expected",
+        [
+            (False, False, {"my_teardown", "my_setup"}),
+            (False, True, {"my_setup", "my_work", "my_teardown"}),
+            (True, False, {"my_teardown", "my_setup"}),
+            (True, True, {"my_setup", "my_work", "my_teardown"}),
+        ],
+    )
+    def test_clearing_setup_clears_teardown_taskflow(self, upstream, 
downstream, expected):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+
+            @setup
+            def my_setup():
+                ...
+
+            @task_decorator
+            def my_work():
+                ...
+
+            @teardown
+            def my_teardown():
+                ...
+
+            s1 = my_setup()
+            w1 = my_work()
+            t1 = my_teardown()
+            s1 >> w1 >> t1
+            s1 >> t1
+        assert {
+            x.task_id
+            for x in dag.partial_subset(
+                "my_setup", include_upstream=upstream, 
include_downstream=downstream
+            ).tasks
+        } == expected

Reply via email to