This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b99f1b1b2c Call setup / teardown validation in dagbag load (#32062)
b99f1b1b2c is described below

commit b99f1b1b2c9fe166a8b2c080c473b02c89d0c2b8
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Jun 22 09:36:24 2023 -0700

    Call setup / teardown validation in dagbag load (#32062)
---
 .../example_setup_teardown_taskflow.py             | 13 ++++++-----
 airflow/models/dag.py                              |  1 +
 tests/models/test_dag.py                           | 26 ++++++++++++++++++++++
 3 files changed, 35 insertions(+), 5 deletions(-)

diff --git a/airflow/example_dags/example_setup_teardown_taskflow.py 
b/airflow/example_dags/example_setup_teardown_taskflow.py
index 7e47475d16..245cc6a2e9 100644
--- a/airflow/example_dags/example_setup_teardown_taskflow.py
+++ b/airflow/example_dags/example_setup_teardown_taskflow.py
@@ -61,10 +61,13 @@ with DAG(
         def hello():
             print("I say hello")
 
-        my_setup()
-        hello()
-        my_teardown()
+        s = my_setup()
+        w = hello()
+        t = my_teardown()
+        s >> w >> t
+        s >> t
 
-    root_setup()
+    rs = root_setup()
     normal() >> section_1()
-    root_teardown()
+    rt = root_teardown()
+    rs >> rt
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index f0c513e41c..906ae989e2 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -685,6 +685,7 @@ class DAG(LoggingMixin):
             )
         self.params.validate()
         self.timetable.validate()
+        self.validate_setup_teardown()
 
     def validate_setup_teardown(self):
         """
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 9d510fa428..fa2bcd7052 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3805,3 +3805,29 @@ class TestTaskClearingSetupTeardownBehavior:
             "g2.w1",
             "g2.w2",
         }
+
+    def test_clear_upstream_not_your_setup(self):
+        """
+        When you have a work task that comes after a setup, then if you clear 
upstream
+        the setup (and its teardown) will be cleared even though strictly 
speaking you don't
+        "require" it since, depending on speed of execution, it might be torn 
down by t1
+        before / while w2 runs.  It just gets cleared by virtue of it being 
upstream, and
+        that's what you requested.  And it's teardown gets cleared too.  But 
w1 doesn't.
+        """
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+            s1 >> w1 >> t1
+            s1 >> w2
+            self.cleared_upstream(w2) == {s1, w2, t1}
+
+    def clearing_teardown_no_clear_setup(self):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
+            s1 >> t1
+            # clearing t1 does not clear s1
+            self.cleared_downstream(t1) == {t1}
+            s1 >> w1 >> t1
+            # that isn't changed with the introduction of w1
+            self.cleared_downstream(t1) == {t1}
+            # though, of course, clearing w1 clears them all
+            self.cleared_downstream(w1) == {s1, w1, t1}

Reply via email to