This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 12d62e62c4 Validate the setup/teardown DAG in one place. (#32492)
12d62e62c4 is described below
commit 12d62e62c483a8f97119c3afe470ba06e250ea9a
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Jul 11 17:14:06 2023 +0100
Validate the setup/teardown DAG in one place. (#32492)
There's no need to have lots of validation at the context manager when we
can have one here to validate all usages regardless of if they were done
with the context manager
---
airflow/models/dag.py | 11 ++-
airflow/utils/setup_teardown.py | 16 ----
tests/decorators/test_setup_teardown.py | 133 --------------------------------
tests/models/test_dag.py | 42 ++++++++++
4 files changed, 49 insertions(+), 153 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c4ccb84bf0..46e7f4608e 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -714,11 +714,14 @@ class DAG(LoggingMixin):
:meta private:
"""
for task in self.tasks:
- if not task.is_setup:
- continue
- if not any(x.is_teardown for x in task.downstream_list):
+ if task.is_setup and not any(x.is_teardown for x in
task.downstream_list):
raise AirflowDagInconsistent(
- "Dag has setup without teardown: dag='%s', task='%s'",
self.dag_id, task.task_id
+ f"Dag has setup without teardown: dag='{self.dag_id}',
task='{task.task_id}'"
+ )
+ if task.is_teardown and all(x.is_setup for x in
task.upstream_list):
+ raise AirflowDagInconsistent(
+ f"Dag has teardown task without an upstream work task:
dag='{self.dag_id}',"
+ f" task='{task.task_id}'"
)
def __repr__(self):
diff --git a/airflow/utils/setup_teardown.py b/airflow/utils/setup_teardown.py
index a82fbe739e..8ca7fa385d 100644
--- a/airflow/utils/setup_teardown.py
+++ b/airflow/utils/setup_teardown.py
@@ -142,26 +142,10 @@ class BaseSetupTeardownContext:
upstream_tasks = operator.upstream_list
downstream_list = operator.downstream_list
if setup:
- if upstream_tasks:
- cls.error("Setup tasks cannot have upstreams set manually on
the context manager")
cls.push_context_managed_setup_task(operator)
- for task in downstream_list:
- if not task.is_teardown:
- cls.error(
- "Downstream tasks to a setup task must be a teardown
task on the context manager"
- )
- if task.downstream_list:
- cls.error("Multiple shifts are not allowed in the context
manager")
if downstream_list:
cls.push_context_managed_teardown_task(list(downstream_list))
else:
- for task in upstream_tasks:
- if not task.is_setup:
- cls.error("Upstream tasks to a teardown task must be a
setup task on the context manager")
- if task.upstream_list:
- cls.error("Multiple shifts are not allowed in the context
manager")
- if downstream_list:
- cls.error("Downstream to a teardown task cannot be set
manually on the context manager")
cls.push_context_managed_teardown_task(operator)
if upstream_tasks:
cls.push_context_managed_setup_task(list(upstream_tasks))
diff --git a/tests/decorators/test_setup_teardown.py
b/tests/decorators/test_setup_teardown.py
index 8163c42b69..4fe195ae3e 100644
--- a/tests/decorators/test_setup_teardown.py
+++ b/tests/decorators/test_setup_teardown.py
@@ -1130,83 +1130,6 @@ class TestSetupTearDownTask:
"mytask",
}
- def test_work_task_inbetween_setup_n_teardown_tasks(self, dag_maker):
- @task
- def mytask():
- print("mytask")
-
- @setup
- def setuptask():
- print("setuptask")
-
- @teardown
- def teardowntask():
- print("teardowntask")
-
- with pytest.raises(
- ValueError, match="Upstream tasks to a teardown task must be a
setup task on the context manager"
- ):
- with dag_maker():
- with setuptask() >> mytask() >> teardowntask():
- ...
-
- def test_errors_when_work_task_is_upstream_of_setup_task(self, dag_maker):
- @task
- def mytask():
- print("mytask")
-
- @setup
- def setuptask():
- print("setuptask")
-
- with pytest.raises(
- ValueError, match="Setup tasks cannot have upstreams set manually
on the context manager"
- ):
- with dag_maker():
- with mytask() >> setuptask():
- ...
-
- def
test_errors_when_work_task_is_upstream_of_context_wrapper_with_teardown(self,
dag_maker):
- @task
- def mytask():
- print("mytask")
-
- @teardown
- def teardowntask():
- print("teardowntask")
-
- @teardown
- def teardowntask2():
- print("teardowntask")
-
- with pytest.raises(
- ValueError, match="Upstream tasks to a teardown task must be a
setup task on the context manager"
- ):
- with dag_maker():
- with mytask() >> context_wrapper([teardowntask(),
teardowntask2()]):
- ...
-
- def
test_errors_when_work_task_is_upstream_of_context_wrapper_with_setup(self,
dag_maker):
- @task
- def mytask():
- print("mytask")
-
- @setup
- def setuptask():
- print("setuptask")
-
- @setup
- def setuptask2():
- print("setuptask")
-
- with pytest.raises(
- ValueError,
- match="Downstream tasks to a setup task must be a teardown task on
the context manager",
- ):
- with dag_maker():
- with mytask() << context_wrapper([setuptask(), setuptask2()]):
- ...
-
def test_tasks_decorators_called_outside_context_manager_can_link_up(self,
dag_maker):
@setup
def setuptask():
@@ -1390,59 +1313,3 @@ class TestSetupTearDownTask:
assert dag.task_group.children["mytask2"].upstream_task_ids ==
{"setuptask2"}
assert dag.task_group.children["mytask2"].downstream_task_ids ==
{"teardowntask2"}
assert dag.task_group.children["teardowntask2"].upstream_task_ids ==
{"mytask2", "setuptask2"}
-
- def test_prevent_bad_usage_of_contextmanager(self, dag_maker):
-
- with dag_maker():
- setuptask = make_task("setuptask", type_="decorated", setup_=True)
- teardowntask = make_task("teardowntask", type_="decorated",
teardown_=True)
- with pytest.raises(
- ValueError,
- match="Downstream to a teardown task cannot be set manually on
the context manager",
- ):
- with setuptask << teardowntask:
- ...
-
- with dag_maker():
- setuptask = make_task("setuptask", type_="decorated", setup_=True)
- setuptask2 = make_task("setuptask2", type_="decorated",
setup_=True)
- teardowntask = make_task("teardowntask", type_="decorated",
teardown_=True)
- with pytest.raises(ValueError, match="Multiple shifts are not
allowed in the context manager"):
- with setuptask >> setuptask2 >> teardowntask:
- ...
-
- with dag_maker():
- setuptask = make_task("setuptask", type_="decorated", setup_=True)
- teardowntask = make_task("teardowntask", type_="decorated",
teardown_=True)
- with pytest.raises(
- ValueError, match="Setup tasks cannot have upstreams set
manually on the context manager"
- ):
- with teardowntask >> setuptask:
- ...
-
- with dag_maker():
- setuptask = make_task("setuptask", type_="decorated", setup_=True)
- teardowntask = make_task("teardowntask", type_="decorated",
teardown_=True)
- teardowntask2 = make_task("teardowntask2", type_="decorated",
teardown_=True)
- with pytest.raises(ValueError, match="Multiple shifts are not
allowed in the context manager"):
- with teardowntask2 << teardowntask << setuptask:
- ...
-
- with dag_maker():
- setuptask = make_task("setuptask", type_="decorated", setup_=True)
- teardowntask = make_task("teardowntask", type_="decorated",
teardown_=True)
- setuptask2 = make_task("setuptask2", type_="decorated",
setup_=True)
- with pytest.raises(
- ValueError, match="Setup tasks cannot have upstreams set
manually on the context manager"
- ):
- with teardowntask >> context_wrapper([setuptask2, setuptask]):
- ...
-
- with dag_maker():
- setuptask = make_task("setuptask", type_="decorated", setup_=True)
- teardowntask = make_task("teardowntask", type_="decorated",
teardown_=True)
- setuptask2 = make_task("setuptask2", type_="decorated",
setup_=True)
- teardowntask2 = make_task("teardowntask2", type_="decorated",
teardown_=True)
- with pytest.raises(ValueError, match="Multiple shifts are not
allowed in the context manager"):
- with setuptask >> setuptask2 >> context_wrapper([teardowntask,
teardowntask2]):
- ...
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 0593c03fc4..14527fd7fa 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3915,3 +3915,45 @@ class TestTaskClearingSetupTeardownBehavior:
"my_setup", include_upstream=upstream,
include_downstream=downstream
).tasks
} == expected
+
+ def test_validate_setup_teardown_dag(self, dag_maker):
+ """Test some invalid setups and teardowns in a dag"""
+ with dag_maker("test_dag") as dag:
+ s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+
+ with s1:
+ w1 >> t1
+ w2 >> t1
+ with pytest.raises(
+ AirflowDagInconsistent, match="Dag has setup without teardown:
dag='test_dag', task='s1'"
+ ):
+ dag.validate()
+
+ with dag_maker("test_dag") as dag:
+ s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+ s1 >> w1 >> w2 >> t1
+ with pytest.raises(
+ AirflowDagInconsistent, match="Dag has setup without teardown:
dag='test_dag', task='s1'"
+ ):
+ dag.validate()
+
+ with dag_maker("test_dag") as dag:
+ s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+ w1 >> w2
+ with s1 >> t1:
+ ...
+ with pytest.raises(
+ AirflowDagInconsistent,
+ match="Dag has teardown task without an upstream work task:
dag='test_dag', task='t1'",
+ ):
+ dag.validate()
+
+ with dag_maker("test_dag") as dag:
+ s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+ s1 >> t1 >> w1 >> w2
+
+ with pytest.raises(
+ AirflowDagInconsistent,
+ match="Dag has teardown task without an upstream work task:
dag='test_dag', task='t1'",
+ ):
+ dag.validate()