This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b99f1b1b2c Call setup / teardown validation in dagbag load (#32062)
b99f1b1b2c is described below
commit b99f1b1b2c9fe166a8b2c080c473b02c89d0c2b8
Author: Daniel Standish <[email protected]>
AuthorDate: Thu Jun 22 09:36:24 2023 -0700
Call setup / teardown validation in dagbag load (#32062)
---
.../example_setup_teardown_taskflow.py | 13 ++++++-----
airflow/models/dag.py | 1 +
tests/models/test_dag.py | 26 ++++++++++++++++++++++
3 files changed, 35 insertions(+), 5 deletions(-)
diff --git a/airflow/example_dags/example_setup_teardown_taskflow.py
b/airflow/example_dags/example_setup_teardown_taskflow.py
index 7e47475d16..245cc6a2e9 100644
--- a/airflow/example_dags/example_setup_teardown_taskflow.py
+++ b/airflow/example_dags/example_setup_teardown_taskflow.py
@@ -61,10 +61,13 @@ with DAG(
def hello():
print("I say hello")
- my_setup()
- hello()
- my_teardown()
+ s = my_setup()
+ w = hello()
+ t = my_teardown()
+ s >> w >> t
+ s >> t
- root_setup()
+ rs = root_setup()
normal() >> section_1()
- root_teardown()
+ rt = root_teardown()
+ rs >> rt
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index f0c513e41c..906ae989e2 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -685,6 +685,7 @@ class DAG(LoggingMixin):
)
self.params.validate()
self.timetable.validate()
+ self.validate_setup_teardown()
def validate_setup_teardown(self):
"""
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 9d510fa428..fa2bcd7052 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3805,3 +3805,29 @@ class TestTaskClearingSetupTeardownBehavior:
"g2.w1",
"g2.w2",
}
+
+ def test_clear_upstream_not_your_setup(self):
+ """
+ When you have a work task that comes after a setup, then if you clear
upstream
+ the setup (and its teardown) will be cleared even though strictly
speaking you don't
+ "require" it since, depending on speed of execution, it might be torn
down by t1
+ before / while w2 runs. It just gets cleared by virtue of it being
upstream, and
+ that's what you requested. And it's teardown gets cleared too. But
w1 doesn't.
+ """
+ with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+ s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
+ s1 >> w1 >> t1
+ s1 >> w2
+ self.cleared_upstream(w2) == {s1, w2, t1}
+
+ def clearing_teardown_no_clear_setup(self):
+ with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+ s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
+ s1 >> t1
+ # clearing t1 does not clear s1
+ self.cleared_downstream(t1) == {t1}
+ s1 >> w1 >> t1
+ # that isn't changed with the introduction of w1
+ self.cleared_downstream(t1) == {t1}
+ # though, of course, clearing w1 clears them all
+ self.cleared_downstream(w1) == {s1, w1, t1}