This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a2eaca8977 Allow setup without teardown (#32679)
a2eaca8977 is described below
commit a2eaca897734ac43d5263832b319c99b3510b7b5
Author: Daniel Standish <[email protected]>
AuthorDate: Wed Jul 19 11:54:58 2023 -0700
Allow setup without teardown (#32679)
---
airflow/models/abstractoperator.py | 7 +++-
airflow/models/dag.py | 4 --
tests/models/test_dag.py | 86 +++++++++++---------------------------
3 files changed, 29 insertions(+), 68 deletions(-)
diff --git a/airflow/models/abstractoperator.py
b/airflow/models/abstractoperator.py
index 9a8f88ce7d..0c6d89fff3 100644
--- a/airflow/models/abstractoperator.py
+++ b/airflow/models/abstractoperator.py
@@ -302,7 +302,8 @@ class AbstractOperator(Templater, DAGNode):
This method is meant to be used when we are clearing the task
(non-upstream) and we need
to add in the *relevant* setups and their teardowns.
- Relevant in this case means, the setup has a teardown that is
downstream of ``self``.
+ Relevant in this case means, the setup has a teardown that is
downstream of ``self``,
+ or the setup has no teardowns.
"""
downstream_teardown_ids = {
x.task_id for x in self.get_flat_relatives(upstream=False) if
x.is_teardown
@@ -310,7 +311,9 @@ class AbstractOperator(Templater, DAGNode):
for task in self.get_flat_relatives(upstream=True):
if not task.is_setup:
continue
- if not
task.downstream_task_ids.isdisjoint(downstream_teardown_ids):
+ has_no_teardowns = not any(True for x in task.downstream_list if
x.is_teardown)
+ # if task has no teardowns or has teardowns downstream of self
+ if has_no_teardowns or
task.downstream_task_ids.intersection(downstream_teardown_ids):
yield task
for t in task.downstream_list:
if t.is_teardown and not t == self:
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 391be9e582..1d380274f6 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -714,10 +714,6 @@ class DAG(LoggingMixin):
:meta private:
"""
for task in self.tasks:
- if task.is_setup and not any(x.is_teardown for x in
task.downstream_list):
- raise AirflowDagInconsistent(
- f"Dag has setup without teardown: dag='{self.dag_id}',
task='{task.task_id}'"
- )
if task.is_teardown and all(x.is_setup for x in
task.upstream_list):
raise AirflowDagInconsistent(
f"Dag has teardown task without an upstream work task:
dag='{self.dag_id}',"
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 14527fd7fa..494c7c7c66 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3623,39 +3623,46 @@ class TestTaskClearingSetupTeardownBehavior:
def test_get_flat_relative_ids_with_setup(self):
with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
- s1, w1, w2, w3, t1 = self.make_tasks(dag, "s1, w1, w2, w3, t1")
+ s1, w1, w2, w3, w4, t1 = self.make_tasks(dag, "s1, w1, w2, w3, w4,
t1")
s1 >> w1 >> w2 >> w3
- # there is no teardown downstream of w1, so we assume w1 does not need
s1
- assert set(w1.get_upstreams_only_setups_and_teardowns()) == set()
+ # w1 is downstream of s1, and s1 has no teardown, so clearing w1
clears s1
+ assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1}
# same with w2 and w3
- assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()
- assert set(w3.get_upstreams_only_setups_and_teardowns()) == set()
- assert self.cleared_downstream(w2) == {w2, w3}
+ assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1}
+ assert set(w3.get_upstreams_only_setups_and_teardowns()) == {s1}
+ # so if we clear w2, we should also get s1, and w3, but not w1
+ assert self.cleared_downstream(w2) == {s1, w2, w3}
w3 >> t1
# now, w2 has a downstream teardown, but it's not connected directly
to s1
- # (this is how we signal "this is the teardown for this setup")
- # so still, we don't regard s1 as a setup for w2
- assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()
- assert self.cleared_downstream(w2) == {w2, w3, t1}
+ assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1}
+ # so if we clear downstream then s1 will be cleared, and t1 will be
cleared but only by virtue of
+ # being downstream of w2 -- not as a result of being the teardown for
s1, which it ain't
+ assert self.cleared_downstream(w2) == {s1, w2, w3, t1}
+ # and, another consequence of not linking s1 and t1 is that when we
clear upstream, note that
+ # t1 doesn't get cleared -- cus it's not upstream and it's not linked
to s1
+ assert self.cleared_upstream(w2) == {s1, w1, w2}
+ # note also that if we add a 4th work task after t1, it will still be
"in scope" for s1
+ t1 >> w4
+ assert self.cleared_downstream(w4) == {s1, w4}
s1 >> t1
- # now, we know that t1 is the teardown for s1, and it's downstream of
- # w2, so we can infer that w2 requires it, so now when we clear w2,
- # we will get s1 (because it's a setup for w2) and t1 (because
- # it is a teardown for s1)
+ # now, we know that t1 is the teardown for s1, so now we know that s1
will be "torn down"
+ # by the time w4 runs, so we now know that w4 no longer requires s1,
so when we clear w4,
+ # s1 will not also be cleared
+ self.cleared_downstream(w4) == {w4}
assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
- assert self.cleared_downstream(w1) == {s1, w1, w2, w3, t1}
+ assert self.cleared_downstream(w1) == {s1, w1, w2, w3, t1, w4}
assert self.cleared_upstream(w1) == {s1, w1, t1}
assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
assert set(w2.get_upstreams_follow_setups()) == {s1, w1, t1}
- assert self.cleared_downstream(w2) == {s1, w2, w3, t1}
+ assert self.cleared_downstream(w2) == {s1, w2, w3, t1, w4}
assert self.cleared_upstream(w2) == {s1, w1, w2, t1}
- assert self.cleared_downstream(w3) == {s1, w3, t1}
+ assert self.cleared_downstream(w3) == {s1, w3, t1, w4}
assert self.cleared_upstream(w3) == {s1, w1, w2, w3, t1}
def test_get_flat_relative_ids_with_setup_nested_ctx_mgr(self):
@@ -3695,32 +3702,6 @@ class TestTaskClearingSetupTeardownBehavior:
# t1 not included because t1 is not downstream
assert self.cleared_downstream(w3) == {s2, w3, t2}
- def test_setup_without_teardown(self):
- """A setup needs a teardown to define its scope."""
- with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
- s1, w1, t1 = self.make_tasks(dag, "s1, w1, t1")
- # s1 has no teardown: fail
- with pytest.raises(AirflowDagInconsistent):
- dag.validate_setup_teardown()
-
- s1 >> w1
- # w1 depends on s1 but not as a "setup" per se, since s1 doesn't have
a teardown to define
- # its scope
- with pytest.raises(AirflowDagInconsistent):
- dag.validate_setup_teardown()
-
- w1 >> t1
- # now t1 is technically downstream of s1, but we still must wire it up
explicitly
- # to define the setup/teardown relationship
- with pytest.raises(AirflowDagInconsistent):
- dag.validate_setup_teardown()
-
- s1 >> t1
- # now, s1 and t1 are linked as setups and teardowns
- # anything upstream of t1 and downstream of s1 is in the scope for s1
- # so now this passes validation
- dag.validate_setup_teardown()
-
def test_get_flat_relative_ids_follows_teardowns(self):
with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
@@ -3918,25 +3899,6 @@ class TestTaskClearingSetupTeardownBehavior:
def test_validate_setup_teardown_dag(self, dag_maker):
"""Test some invalid setups and teardowns in a dag"""
- with dag_maker("test_dag") as dag:
- s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
-
- with s1:
- w1 >> t1
- w2 >> t1
- with pytest.raises(
- AirflowDagInconsistent, match="Dag has setup without teardown:
dag='test_dag', task='s1'"
- ):
- dag.validate()
-
- with dag_maker("test_dag") as dag:
- s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
- s1 >> w1 >> w2 >> t1
- with pytest.raises(
- AirflowDagInconsistent, match="Dag has setup without teardown:
dag='test_dag', task='s1'"
- ):
- dag.validate()
-
with dag_maker("test_dag") as dag:
s1, w1, w2, t1 = self.make_tasks(dag, "s1, w1, w2, t1")
w1 >> w2