This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 566bc1b68b Fix tests and add logic to handle clearing setup directly
(#32430)
566bc1b68b is described below
commit 566bc1b68b4e1643761b4e8518e5e556b8e6e82c
Author: Daniel Standish <[email protected]>
AuthorDate: Fri Jul 7 15:52:14 2023 -0700
Fix tests and add logic to handle clearing setup directly (#32430)
---
airflow/models/dag.py | 2 ++
tests/models/test_dag.py | 77 +++++++++++++++++++++++++++++++++++++++++++-----
2 files changed, 72 insertions(+), 7 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 1ec78b64c3..234db54b15 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2373,6 +2373,8 @@ class DAG(LoggingMixin):
also_include.extend(t.get_upstreams_follow_setups())
else:
also_include.extend(t.get_upstreams_only_setups_and_teardowns())
+ if t.is_setup and not include_downstream:
+ also_include.extend(x for x in t.downstream_list if
x.is_teardown)
direct_upstreams: list[Operator] = []
if include_direct_upstream:
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index bd460e6cc7..e4a3290a5d 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3590,12 +3590,23 @@ class TestTaskClearingSetupTeardownBehavior:
upstream = True
return set(
task.dag.partial_subset(
- task_ids_or_regex=[task.task_id],
+ task_ids_or_regex=task.task_id,
include_downstream=not upstream,
include_upstream=upstream,
).tasks
)
+ @staticmethod
+ def cleared_neither(task):
+ """Helper to return tasks that would be cleared if **upstream**
selected."""
+ return set(
+ task.dag.partial_subset(
+ task_ids_or_regex=[task.task_id],
+ include_downstream=False,
+ include_upstream=False,
+ ).tasks
+ )
+
def test_get_flat_relative_ids_with_setup(self):
with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
s1, w1, w2, w3, t1 = self.make_tasks(dag, "s1, w1, w2, w3, t1")
@@ -3823,18 +3834,70 @@ class TestTaskClearingSetupTeardownBehavior:
"""
with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
- s1 >> w1 >> t1
+ s1 >> w1 >> t1.as_teardown(setups=s1)
s1 >> w2
- self.cleared_upstream(w2) == {s1, w2, t1}
+ # w2 is downstream of s1, so when clearing upstream, it should
clear s1 (since it
+ # is upstream of w2) and t1 since it's the teardown for s1 even
though not downstream of w1
+ assert self.cleared_upstream(w2) == {s1, w2, t1}
- def clearing_teardown_no_clear_setup(self):
+ def test_clearing_teardown_no_clear_setup(self):
with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
s1 >> t1
# clearing t1 does not clear s1
- self.cleared_downstream(t1) == {t1}
+ assert self.cleared_downstream(t1) == {t1}
s1 >> w1 >> t1
# that isn't changed with the introduction of w1
- self.cleared_downstream(t1) == {t1}
+ assert self.cleared_downstream(t1) == {t1}
# though, of course, clearing w1 clears them all
- self.cleared_downstream(w1) == {s1, w1, t1}
+ assert self.cleared_downstream(w1) == {s1, w1, t1}
+
+ def test_clearing_setup_clears_teardown(self):
+ with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+ s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
+ s1 >> t1
+ s1 >> w1 >> t1
+ # clearing w1 clears all always
+ assert self.cleared_upstream(w1) == {s1, w1, t1}
+ assert self.cleared_downstream(w1) == {s1, w1, t1}
+ assert self.cleared_neither(w1) == {s1, w1, t1}
+ # clearing s1 clears t1 always
+ assert self.cleared_upstream(s1) == {s1, t1}
+ assert self.cleared_downstream(s1) == {s1, w1, t1}
+ assert self.cleared_neither(s1) == {s1, t1}
+
+ @pytest.mark.parametrize(
+ "upstream, downstream, expected",
+ [
+ (False, False, {"my_teardown", "my_setup"}),
+ (False, True, {"my_setup", "my_work", "my_teardown"}),
+ (True, False, {"my_teardown", "my_setup"}),
+ (True, True, {"my_setup", "my_work", "my_teardown"}),
+ ],
+ )
+ def test_clearing_setup_clears_teardown_taskflow(self, upstream,
downstream, expected):
+ with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+
+ @setup
+ def my_setup():
+ ...
+
+ @task_decorator
+ def my_work():
+ ...
+
+ @teardown
+ def my_teardown():
+ ...
+
+ s1 = my_setup()
+ w1 = my_work()
+ t1 = my_teardown()
+ s1 >> w1 >> t1
+ s1 >> t1
+ assert {
+ x.task_id
+ for x in dag.partial_subset(
+ "my_setup", include_upstream=upstream,
include_downstream=downstream
+ ).tasks
+ } == expected