This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 45713446f3 Fix clearing behavior for downstream work task with
non-collinear setup task (#33358)
45713446f3 is described below
commit 45713446f37ee4b1ee972ab8b5aa1ac0b2482197
Author: Daniel Standish <[email protected]>
AuthorDate: Sun Aug 13 05:56:01 2023 -0700
Fix clearing behavior for downstream work task with non-collinear setup
task (#33358)
* Fix clearing behavior for downstream work task with non-collinear setup
With this kind of dag if you clear w1 downstream then you also clear w2:
```python
s1 >> w1 >> [w2, t1]
s1 >> t1
s2 >> t2
s2 >> w2 >> t2
```
We need to make sure that the setup for w2 also gets cleared. But, to
avoid the need to recurse to arbitrary depth for setups of setups, let's just
say that a setup cannot have a setup. A setup can *come after* another setup,
but it won't *be* a setup for the setup (and what's at stake is just the
clearing behavior).
* fixup
* teardowns can't have setup / teardown either
---
airflow/models/dag.py | 20 +++++--
tests/models/test_dag.py | 137 ++++++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 150 insertions(+), 7 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 28dfca48eb..36555ce10f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2356,17 +2356,27 @@ class DAG(LoggingMixin):
else:
matched_tasks = [t for t in self.tasks if t.task_id in
task_ids_or_regex]
- also_include: list[Operator] = []
+ also_include_ids: set[str] = set()
for t in matched_tasks:
if include_downstream:
- also_include.extend(t.get_flat_relatives(upstream=False))
+ for rel in t.get_flat_relatives(upstream=False):
+ also_include_ids.add(rel.task_id)
+ if rel not in matched_tasks: # if it's in there, we're
already processing it
+ # need to include setups and teardowns for tasks that
are in multiple
+ # non-collinear setup/teardown paths
+ if not rel.is_setup and not rel.is_teardown:
+ also_include_ids.update(
+ x.task_id for x in
rel.get_upstreams_only_setups_and_teardowns()
+ )
if include_upstream:
- also_include.extend(t.get_upstreams_follow_setups())
+ also_include_ids.update(x.task_id for x in
t.get_upstreams_follow_setups())
else:
-
also_include.extend(t.get_upstreams_only_setups_and_teardowns())
+ if not t.is_setup and not t.is_teardown:
+ also_include_ids.update(x.task_id for x in
t.get_upstreams_only_setups_and_teardowns())
if t.is_setup and not include_downstream:
- also_include.extend(x for x in t.downstream_list if
x.is_teardown)
+ also_include_ids.update(x.task_id for x in t.downstream_list
if x.is_teardown)
+ also_include: list[Operator] = [self.task_dict[x] for x in
also_include_ids]
direct_upstreams: list[Operator] = []
if include_direct_upstream:
for t in itertools.chain(matched_tasks, also_include):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index cd296c0ddc..b6827fa541 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3730,7 +3730,9 @@ class TestTaskClearingSetupTeardownBehavior:
s2 >> w2 >> t2
assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
- assert self.cleared_downstream(w1) == {s1, w1, t1, w2, t2}
+ # s2 is included because w2 is included
+ assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2}
+ assert self.cleared_neither(w1) == {s1, w1, t1}
assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2}
assert self.cleared_downstream(w2) == {s2, w2, t2}
@@ -3750,7 +3752,9 @@ class TestTaskClearingSetupTeardownBehavior:
s3 >> w2 >> [t3a, t3b]
s3 >> [t3a, t3b]
assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1a, s1b,
t1}
- assert self.cleared_downstream(w1) == {s1a, s1b, w1, t1, t3a, t3b, w2,
t2}
+ # since w2 is downstream of w1, w2 gets cleared.
+ # and since w2 gets cleared, we should also see s2 and s3 in here
+ assert self.cleared_downstream(w1) == {s1a, s1b, w1, t1, s3, t3a, t3b,
w2, s2, t2}
assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2,
s3, t3a, t3b}
assert self.cleared_downstream(w2) == {s2, s3, w2, t2, t3a, t3b}
@@ -3897,3 +3901,132 @@ class TestTaskClearingSetupTeardownBehavior:
"my_setup", include_upstream=upstream,
include_downstream=downstream
).tasks
} == expected
+
+ def test_get_flat_relative_ids_two_tasks_diff_setup_teardowns_deeper(self):
+ with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+ s1, t1, s2, t2, w1, w2, s3, w3, t3 = self.make_tasks(dag, "s1, t1,
s2, t2, w1, w2, s3, w3, t3")
+ s1 >> w1 >> t1
+ s1 >> t1
+ w1 >> w2
+
+ # with the below, s2 is not downstream of w1, but it's the setup for w2
+ # so it should be cleared when w1 is cleared
+ s2 >> w2 >> t2
+ s2 >> t2
+
+ assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
+ assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2}
+ assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2}
+ assert self.cleared_downstream(w2) == {s2, w2, t2}
+
+ # now, what if s2 itself has a setup and teardown?
+ s3 >> s2 >> t3
+ s3 >> t3
+ # note that s3 is excluded because it's assumed that a setup won't
have a setup
+ # so, we don't continue to recurse for setups after reaching the
setups for
+ # the downstream work tasks
+ # but, t3 is included since it's a teardown for s2
+ assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2, t3}
+
+ def test_clearing_behavior_multiple_setups_for_work_task(self):
+ with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+ s1, t1, s2, t2, w1, w2, s3, w3, t3 = self.make_tasks(dag, "s1, t1,
s2, t2, w1, w2, s3, w3, t3")
+ s1 >> t1
+ s2 >> t2
+ s3 >> t3
+ s1 >> s2 >> s3 >> w1 >> w2 >> [t1, t2, t3]
+
+ assert self.cleared_downstream(w1) == {s1, s2, s3, w1, w2, t1, t2, t3}
+ assert self.cleared_downstream(w2) == {s1, s2, s3, w2, t1, t2, t3}
+ assert self.cleared_downstream(s3) == {s1, s2, s3, w1, w2, t1, t2, t3}
+ # even if we don't include upstream / downstream, setups and teardowns
are cleared
+ assert self.cleared_neither(w2) == {s3, t3, s2, t2, s1, t1, w2}
+ assert self.cleared_neither(w1) == {s3, t3, s2, t2, s1, t1, w1}
+ # but, a setup doesn't formally have a setup, so if we only clear s3,
say then its upstream setups
+ # are not also cleared
+ assert self.cleared_neither(s3) == {s3, t3}
+ assert self.cleared_neither(s2) == {s2, t2}
+
+ def test_clearing_behavior_multiple_setups_for_work_task2(self):
+ with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+ s1, t1, s2, t2, w1, w2, s3, w3, t3 = self.make_tasks(dag, "s1, t1,
s2, t2, w1, w2, s3, w3, t3")
+ s1 >> t1
+ s2 >> t2
+ s3 >> t3
+ [s1, s2, s3] >> w1 >> w2 >> [t1, t2, t3]
+
+ assert self.cleared_downstream(w1) == {s1, s2, s3, w1, w2, t1, t2, t3}
+ assert self.cleared_downstream(w2) == {s1, s2, s3, w2, t1, t2, t3}
+
+ def test_clearing_behavior_more_tertiary_weirdness(self):
+ with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+ s1, t1, s2, t2, w1, w2, s3, t3 = self.make_tasks(dag, "s1, t1, s2,
t2, w1, w2, s3, t3")
+ s1 >> t1
+ s2 >> t2
+ s1 >> w1 >> s2 >> w2 >> [t1, t2]
+ s2 >> w2 >> t2
+ s3 >> s2 >> t3
+ s3 >> t3
+
+ def sort(task_list):
+ return sorted(x.task_id for x in task_list)
+
+ assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
+ # s2 is included because w2 is included
+ assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2, t3}
+ assert self.cleared_downstream(w2) == {s1, t1, s2, w2, t2, t3}
+ # t3 is included since s2 is included and s2 >> t3
+ # but s3 not included because it's assumed that a setup doesn't have a
setup
+ assert self.cleared_neither(w2) == {s1, w2, t1, s2, t2, t3}
+
+ # since we're clearing upstream, s3 is upstream of w2, so s3 and t3
are included
+ # even though w2 doesn't require them
+ # s2 and t2 are included for obvious reasons, namely that w2 requires
s2
+ # and s1 and t1 are included for the same reason
+ # w1 included since it is upstream of w2
+ assert sort(self.cleared_upstream(w2)) == sort({s1, t1, s2, t2, s3,
t3, w1, w2})
+
+ # t3 is included here since it's a teardown for s2
+ assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2,
s1, t1, t3}
+
+ def test_clearing_behavior_more_tertiary_weirdness2(self):
+ with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+ s1, t1, s2, t2, w1, w2, s3, t3 = self.make_tasks(dag, "s1, t1, s2,
t2, w1, w2, s3, t3")
+ s1 >> t1
+ s2 >> t2
+ s1 >> w1 >> t1
+ s2 >> t1 >> t2
+
+ def sort(task_list):
+ return sorted(x.task_id for x in task_list)
+
+ # t2 included since downstream, but s2 not included since it's not
required by t2
+ # and clearing teardown does not clear the setup
+ assert self.cleared_downstream(w1) == {s1, w1, t1, t2}
+
+ # even though t1 is cleared here, s2 and t2 are not "setup and
teardown" for t1
+ # so they are not included
+ assert self.cleared_neither(w1) == {s1, w1, t1}
+ assert self.cleared_upstream(w1) == {s1, w1, t1}
+
+ # t1 does not have a setup or teardown
+ # but t2 is downstream so it's included
+ # and s2 is not included since clearing teardown does not clear the
setup
+ assert self.cleared_downstream(t1) == {t1, t2}
+ # t1 does not have a setup or teardown
+ assert self.cleared_neither(t1) == {t1}
+ # s2 included since upstream, and t2 included since s2 included
+ assert self.cleared_upstream(t1) == {s1, t1, s2, t2, w1}
+
+ def test_clearing_behavior_just_teardown(self):
+ with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+ s1, t1 = self.make_tasks(dag, "s1, t1")
+ s1 >> t1
+ assert set(t1.get_upstreams_only_setups_and_teardowns()) == set()
+ assert self.cleared_upstream(t1) == {s1, t1}
+ assert self.cleared_downstream(t1) == {t1}
+ assert self.cleared_neither(t1) == {t1}
+ assert set(s1.get_upstreams_only_setups_and_teardowns()) == set()
+ assert self.cleared_upstream(s1) == {s1, t1}
+ assert self.cleared_downstream(s1) == {s1, t1}
+ assert self.cleared_neither(s1) == {s1, t1}