This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 12d62e62c4 Validate the setup/teardown DAG in one place. (#32492)
12d62e62c4 is described below

commit 12d62e62c483a8f97119c3afe470ba06e250ea9a
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Jul 11 17:14:06 2023 +0100

    Validate the setup/teardown DAG in one place. (#32492)
    
    There's no need to have lots of validation at the context manager when we
    can have one here to validate all usages regardless of if they were done
    with the context manager
---
 airflow/models/dag.py                   |  11 ++-
 airflow/utils/setup_teardown.py         |  16 ----
 tests/decorators/test_setup_teardown.py | 133 --------------------------------
 tests/models/test_dag.py                |  42 ++++++++++
 4 files changed, 49 insertions(+), 153 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c4ccb84bf0..46e7f4608e 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -714,11 +714,14 @@ class DAG(LoggingMixin):
         :meta private:
         """
         for task in self.tasks:
-            if not task.is_setup:
-                continue
-            if not any(x.is_teardown for x in task.downstream_list):
+            if task.is_setup and not any(x.is_teardown for x in 
task.downstream_list):
                 raise AirflowDagInconsistent(
-                    "Dag has setup without teardown: dag='%s', task='%s'", 
self.dag_id, task.task_id
+                    f"Dag has setup without teardown: dag='{self.dag_id}', 
task='{task.task_id}'"
+                )
+            if task.is_teardown and all(x.is_setup for x in 
task.upstream_list):
+                raise AirflowDagInconsistent(
+                    f"Dag has teardown task without an upstream work task: 
dag='{self.dag_id}',"
+                    f" task='{task.task_id}'"
                 )
 
     def __repr__(self):
diff --git a/airflow/utils/setup_teardown.py b/airflow/utils/setup_teardown.py
index a82fbe739e..8ca7fa385d 100644
--- a/airflow/utils/setup_teardown.py
+++ b/airflow/utils/setup_teardown.py
@@ -142,26 +142,10 @@ class BaseSetupTeardownContext:
             upstream_tasks = operator.upstream_list
             downstream_list = operator.downstream_list
         if setup:
-            if upstream_tasks:
-                cls.error("Setup tasks cannot have upstreams set manually on 
the context manager")
             cls.push_context_managed_setup_task(operator)
-            for task in downstream_list:
-                if not task.is_teardown:
-                    cls.error(
-                        "Downstream tasks to a setup task must be a teardown 
task on the context manager"
-                    )
-                if task.downstream_list:
-                    cls.error("Multiple shifts are not allowed in the context 
manager")
             if downstream_list:
                 cls.push_context_managed_teardown_task(list(downstream_list))
         else:
-            for task in upstream_tasks:
-                if not task.is_setup:
-                    cls.error("Upstream tasks to a teardown task must be a 
setup task on the context manager")
-                if task.upstream_list:
-                    cls.error("Multiple shifts are not allowed in the context 
manager")
-            if downstream_list:
-                cls.error("Downstream to a teardown task cannot be set 
manually on the context manager")
             cls.push_context_managed_teardown_task(operator)
             if upstream_tasks:
                 cls.push_context_managed_setup_task(list(upstream_tasks))
diff --git a/tests/decorators/test_setup_teardown.py 
b/tests/decorators/test_setup_teardown.py
index 8163c42b69..4fe195ae3e 100644
--- a/tests/decorators/test_setup_teardown.py
+++ b/tests/decorators/test_setup_teardown.py
@@ -1130,83 +1130,6 @@ class TestSetupTearDownTask:
             "mytask",
         }
 
-    def test_work_task_inbetween_setup_n_teardown_tasks(self, dag_maker):
-        @task
-        def mytask():
-            print("mytask")
-
-        @setup
-        def setuptask():
-            print("setuptask")
-
-        @teardown
-        def teardowntask():
-            print("teardowntask")
-
-        with pytest.raises(
-            ValueError, match="Upstream tasks to a teardown task must be a 
setup task on the context manager"
-        ):
-            with dag_maker():
-                with setuptask() >> mytask() >> teardowntask():
-                    ...
-
-    def test_errors_when_work_task_is_upstream_of_setup_task(self, dag_maker):
-        @task
-        def mytask():
-            print("mytask")
-
-        @setup
-        def setuptask():
-            print("setuptask")
-
-        with pytest.raises(
-            ValueError, match="Setup tasks cannot have upstreams set manually 
on the context manager"
-        ):
-            with dag_maker():
-                with mytask() >> setuptask():
-                    ...
-
-    def 
test_errors_when_work_task_is_upstream_of_context_wrapper_with_teardown(self, 
dag_maker):
-        @task
-        def mytask():
-            print("mytask")
-
-        @teardown
-        def teardowntask():
-            print("teardowntask")
-
-        @teardown
-        def teardowntask2():
-            print("teardowntask")
-
-        with pytest.raises(
-            ValueError, match="Upstream tasks to a teardown task must be a 
setup task on the context manager"
-        ):
-            with dag_maker():
-                with mytask() >> context_wrapper([teardowntask(), 
teardowntask2()]):
-                    ...
-
-    def 
test_errors_when_work_task_is_upstream_of_context_wrapper_with_setup(self, 
dag_maker):
-        @task
-        def mytask():
-            print("mytask")
-
-        @setup
-        def setuptask():
-            print("setuptask")
-
-        @setup
-        def setuptask2():
-            print("setuptask")
-
-        with pytest.raises(
-            ValueError,
-            match="Downstream tasks to a setup task must be a teardown task on 
the context manager",
-        ):
-            with dag_maker():
-                with mytask() << context_wrapper([setuptask(), setuptask2()]):
-                    ...
-
     def test_tasks_decorators_called_outside_context_manager_can_link_up(self, 
dag_maker):
         @setup
         def setuptask():
@@ -1390,59 +1313,3 @@ class TestSetupTearDownTask:
         assert dag.task_group.children["mytask2"].upstream_task_ids == 
{"setuptask2"}
         assert dag.task_group.children["mytask2"].downstream_task_ids == 
{"teardowntask2"}
         assert dag.task_group.children["teardowntask2"].upstream_task_ids == 
{"mytask2", "setuptask2"}
-
-    def test_prevent_bad_usage_of_contextmanager(self, dag_maker):
-
-        with dag_maker():
-            setuptask = make_task("setuptask", type_="decorated", setup_=True)
-            teardowntask = make_task("teardowntask", type_="decorated", 
teardown_=True)
-            with pytest.raises(
-                ValueError,
-                match="Downstream to a teardown task cannot be set manually on 
the context manager",
-            ):
-                with setuptask << teardowntask:
-                    ...
-
-        with dag_maker():
-            setuptask = make_task("setuptask", type_="decorated", setup_=True)
-            setuptask2 = make_task("setuptask2", type_="decorated", 
setup_=True)
-            teardowntask = make_task("teardowntask", type_="decorated", 
teardown_=True)
-            with pytest.raises(ValueError, match="Multiple shifts are not 
allowed in the context manager"):
-                with setuptask >> setuptask2 >> teardowntask:
-                    ...
-
-        with dag_maker():
-            setuptask = make_task("setuptask", type_="decorated", setup_=True)
-            teardowntask = make_task("teardowntask", type_="decorated", 
teardown_=True)
-            with pytest.raises(
-                ValueError, match="Setup tasks cannot have upstreams set 
manually on the context manager"
-            ):
-                with teardowntask >> setuptask:
-                    ...
-
-        with dag_maker():
-            setuptask = make_task("setuptask", type_="decorated", setup_=True)
-            teardowntask = make_task("teardowntask", type_="decorated", 
teardown_=True)
-            teardowntask2 = make_task("teardowntask2", type_="decorated", 
teardown_=True)
-            with pytest.raises(ValueError, match="Multiple shifts are not 
allowed in the context manager"):
-                with teardowntask2 << teardowntask << setuptask:
-                    ...
-
-        with dag_maker():
-            setuptask = make_task("setuptask", type_="decorated", setup_=True)
-            teardowntask = make_task("teardowntask", type_="decorated", 
teardown_=True)
-            setuptask2 = make_task("setuptask2", type_="decorated", 
setup_=True)
-            with pytest.raises(
-                ValueError, match="Setup tasks cannot have upstreams set 
manually on the context manager"
-            ):
-                with teardowntask >> context_wrapper([setuptask2, setuptask]):
-                    ...
-
-        with dag_maker():
-            setuptask = make_task("setuptask", type_="decorated", setup_=True)
-            teardowntask = make_task("teardowntask", type_="decorated", 
teardown_=True)
-            setuptask2 = make_task("setuptask2", type_="decorated", 
setup_=True)
-            teardowntask2 = make_task("teardowntask2", type_="decorated", 
teardown_=True)
-            with pytest.raises(ValueError, match="Multiple shifts are not 
allowed in the context manager"):
-                with setuptask >> setuptask2 >> context_wrapper([teardowntask, 
teardowntask2]):
-                    ...
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 0593c03fc4..14527fd7fa 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3915,3 +3915,45 @@ class TestTaskClearingSetupTeardownBehavior:
                 "my_setup", include_upstream=upstream, 
include_downstream=downstream
             ).tasks
         } == expected
+
+    def test_validate_setup_teardown_dag(self, dag_maker):
+        """Test some invalid setups and teardowns in a dag"""
+        with dag_maker("test_dag") as dag:
+            s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+
+            with s1:
+                w1 >> t1
+                w2 >> t1
+        with pytest.raises(
+            AirflowDagInconsistent, match="Dag has setup without teardown: 
dag='test_dag', task='s1'"
+        ):
+            dag.validate()
+
+        with dag_maker("test_dag") as dag:
+            s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+            s1 >> w1 >> w2 >> t1
+        with pytest.raises(
+            AirflowDagInconsistent, match="Dag has setup without teardown: 
dag='test_dag', task='s1'"
+        ):
+            dag.validate()
+
+        with dag_maker("test_dag") as dag:
+            s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+            w1 >> w2
+            with s1 >> t1:
+                ...
+        with pytest.raises(
+            AirflowDagInconsistent,
+            match="Dag has teardown task without an upstream work task: 
dag='test_dag', task='t1'",
+        ):
+            dag.validate()
+
+        with dag_maker("test_dag") as dag:
+            s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+            s1 >> t1 >> w1 >> w2
+
+        with pytest.raises(
+            AirflowDagInconsistent,
+            match="Dag has teardown task without an upstream work task: 
dag='test_dag', task='t1'",
+        ):
+            dag.validate()

Reply via email to