This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1b0b952bb37384e3318f8bf4d97c47c5ed7cd79c Author: Daniel Standish <[email protected]> AuthorDate: Sun Aug 13 05:56:01 2023 -0700 Fix clearing behavior for downstream work task with non-collinear setup task (#33358) * Fix clearing behavior for downstream work task with non-collinear setup With this kind of dag if you clear w1 downstream then you also clear w2: ```python s1 >> w1 >> [w2, t1] s1 >> t1 s2 >> t2 s2 >> w2 >> t2 ``` We need to make sure that the setup for w2 also gets cleared. But, to avoid the need to recurse to arbitrary depth for setups of setups, let's just say that a setup cannot have a setup. A setup can *come after* another setup, but it won't *be* a setup for the setup (and what's at stake is just the clearing behavior). * fixup * teardowns can't have setup / teardown either (cherry picked from commit 45713446f37ee4b1ee972ab8b5aa1ac0b2482197) --- airflow/models/dag.py | 20 +++++-- tests/models/test_dag.py | 137 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 150 insertions(+), 7 deletions(-) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 1cb9220e58..d4ff29db2f 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2356,17 +2356,27 @@ class DAG(LoggingMixin): else: matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex] - also_include: list[Operator] = [] + also_include_ids: set[str] = set() for t in matched_tasks: if include_downstream: - also_include.extend(t.get_flat_relatives(upstream=False)) + for rel in t.get_flat_relatives(upstream=False): + also_include_ids.add(rel.task_id) + if rel not in matched_tasks: # if it's in there, we're already processing it + # need to include setups and teardowns for tasks that are in multiple + # non-collinear setup/teardown paths + if not rel.is_setup and not rel.is_teardown: + also_include_ids.update( + x.task_id for x in rel.get_upstreams_only_setups_and_teardowns() + ) if include_upstream: - also_include.extend(t.get_upstreams_follow_setups()) + also_include_ids.update(x.task_id for x in t.get_upstreams_follow_setups()) else: - also_include.extend(t.get_upstreams_only_setups_and_teardowns()) + if not t.is_setup and not t.is_teardown: + also_include_ids.update(x.task_id for x in t.get_upstreams_only_setups_and_teardowns()) if t.is_setup and not include_downstream: - also_include.extend(x for x in t.downstream_list if x.is_teardown) + also_include_ids.update(x.task_id for x in t.downstream_list if x.is_teardown) + also_include: list[Operator] = [self.task_dict[x] for x in also_include_ids] direct_upstreams: list[Operator] = [] if include_direct_upstream: for t in itertools.chain(matched_tasks, also_include): diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index cd296c0ddc..b6827fa541 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -3730,7 +3730,9 @@ class TestTaskClearingSetupTeardownBehavior: s2 >> w2 >> t2 assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1} - assert self.cleared_downstream(w1) == {s1, w1, t1, w2, t2} + # s2 is included because w2 is included + assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2} + assert self.cleared_neither(w1) == {s1, w1, t1} assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2} assert self.cleared_downstream(w2) == {s2, w2, t2} @@ -3750,7 +3752,9 @@ class TestTaskClearingSetupTeardownBehavior: s3 >> w2 >> [t3a, t3b] s3 >> [t3a, t3b] assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1a, s1b, t1} - assert self.cleared_downstream(w1) == {s1a, s1b, w1, t1, t3a, t3b, w2, t2} + # since w2 is downstream of w1, w2 gets cleared. + # and since w2 gets cleared, we should also see s2 and s3 in here + assert self.cleared_downstream(w1) == {s1a, s1b, w1, t1, s3, t3a, t3b, w2, s2, t2} assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2, s3, t3a, t3b} assert self.cleared_downstream(w2) == {s2, s3, w2, t2, t3a, t3b} @@ -3897,3 +3901,132 @@ class TestTaskClearingSetupTeardownBehavior: "my_setup", include_upstream=upstream, include_downstream=downstream ).tasks } == expected + + def test_get_flat_relative_ids_two_tasks_diff_setup_teardowns_deeper(self): + with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag: + s1, t1, s2, t2, w1, w2, s3, w3, t3 = self.make_tasks(dag, "s1, t1, s2, t2, w1, w2, s3, w3, t3") + s1 >> w1 >> t1 + s1 >> t1 + w1 >> w2 + + # with the below, s2 is not downstream of w1, but it's the setup for w2 + # so it should be cleared when w1 is cleared + s2 >> w2 >> t2 + s2 >> t2 + + assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1} + assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2} + assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2} + assert self.cleared_downstream(w2) == {s2, w2, t2} + + # now, what if s2 itself has a setup and teardown? + s3 >> s2 >> t3 + s3 >> t3 + # note that s3 is excluded because it's assumed that a setup won't have a setup + # so, we don't continue to recurse for setups after reaching the setups for + # the downstream work tasks + # but, t3 is included since it's a teardown for s2 + assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2, t3} + + def test_clearing_behavior_multiple_setups_for_work_task(self): + with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag: + s1, t1, s2, t2, w1, w2, s3, w3, t3 = self.make_tasks(dag, "s1, t1, s2, t2, w1, w2, s3, w3, t3") + s1 >> t1 + s2 >> t2 + s3 >> t3 + s1 >> s2 >> s3 >> w1 >> w2 >> [t1, t2, t3] + + assert self.cleared_downstream(w1) == {s1, s2, s3, w1, w2, t1, t2, t3} + assert self.cleared_downstream(w2) == {s1, s2, s3, w2, t1, t2, t3} + assert self.cleared_downstream(s3) == {s1, s2, s3, w1, w2, t1, t2, t3} + # even if we don't include upstream / downstream, setups and teardowns are cleared + assert self.cleared_neither(w2) == {s3, t3, s2, t2, s1, t1, w2} + assert self.cleared_neither(w1) == {s3, t3, s2, t2, s1, t1, w1} + # but, a setup doesn't formally have a setup, so if we only clear s3, say then its upstream setups + # are not also cleared + assert self.cleared_neither(s3) == {s3, t3} + assert self.cleared_neither(s2) == {s2, t2} + + def test_clearing_behavior_multiple_setups_for_work_task2(self): + with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag: + s1, t1, s2, t2, w1, w2, s3, w3, t3 = self.make_tasks(dag, "s1, t1, s2, t2, w1, w2, s3, w3, t3") + s1 >> t1 + s2 >> t2 + s3 >> t3 + [s1, s2, s3] >> w1 >> w2 >> [t1, t2, t3] + + assert self.cleared_downstream(w1) == {s1, s2, s3, w1, w2, t1, t2, t3} + assert self.cleared_downstream(w2) == {s1, s2, s3, w2, t1, t2, t3} + + def test_clearing_behavior_more_tertiary_weirdness(self): + with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag: + s1, t1, s2, t2, w1, w2, s3, t3 = self.make_tasks(dag, "s1, t1, s2, t2, w1, w2, s3, t3") + s1 >> t1 + s2 >> t2 + s1 >> w1 >> s2 >> w2 >> [t1, t2] + s2 >> w2 >> t2 + s3 >> s2 >> t3 + s3 >> t3 + + def sort(task_list): + return sorted(x.task_id for x in task_list) + + assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1} + # s2 is included because w2 is included + assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2, t3} + assert self.cleared_downstream(w2) == {s1, t1, s2, w2, t2, t3} + # t3 is included since s2 is included and s2 >> t3 + # but s3 not included because it's assumed that a setup doesn't have a setup + assert self.cleared_neither(w2) == {s1, w2, t1, s2, t2, t3} + + # since we're clearing upstream, s3 is upstream of w2, so s3 and t3 are included + # even though w2 doesn't require them + # s2 and t2 are included for obvious reasons, namely that w2 requires s2 + # and s1 and t1 are included for the same reason + # w1 included since it is upstream of w2 + assert sort(self.cleared_upstream(w2)) == sort({s1, t1, s2, t2, s3, t3, w1, w2}) + + # t3 is included here since it's a teardown for s2 + assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2, s1, t1, t3} + + def test_clearing_behavior_more_tertiary_weirdness2(self): + with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag: + s1, t1, s2, t2, w1, w2, s3, t3 = self.make_tasks(dag, "s1, t1, s2, t2, w1, w2, s3, t3") + s1 >> t1 + s2 >> t2 + s1 >> w1 >> t1 + s2 >> t1 >> t2 + + def sort(task_list): + return sorted(x.task_id for x in task_list) + + # t2 included since downstream, but s2 not included since it's not required by t2 + # and clearing teardown does not clear the setup + assert self.cleared_downstream(w1) == {s1, w1, t1, t2} + + # even though t1 is cleared here, s2 and t2 are not "setup and teardown" for t1 + # so they are not included + assert self.cleared_neither(w1) == {s1, w1, t1} + assert self.cleared_upstream(w1) == {s1, w1, t1} + + # t1 does not have a setup or teardown + # but t2 is downstream so it's included + # and s2 is not included since clearing teardown does not clear the setup + assert self.cleared_downstream(t1) == {t1, t2} + # t1 does not have a setup or teardown + assert self.cleared_neither(t1) == {t1} + # s2 included since upstream, and t2 included since s2 included + assert self.cleared_upstream(t1) == {s1, t1, s2, t2, w1} + + def test_clearing_behavior_just_teardown(self): + with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag: + s1, t1 = self.make_tasks(dag, "s1, t1") + s1 >> t1 + assert set(t1.get_upstreams_only_setups_and_teardowns()) == set() + assert self.cleared_upstream(t1) == {s1, t1} + assert self.cleared_downstream(t1) == {t1} + assert self.cleared_neither(t1) == {t1} + assert set(s1.get_upstreams_only_setups_and_teardowns()) == set() + assert self.cleared_upstream(s1) == {s1, t1} + assert self.cleared_downstream(s1) == {s1, t1} + assert self.cleared_neither(s1) == {s1, t1}
