This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 45713446f3 Fix clearing behavior for downstream work task with 
non-collinear setup task (#33358)
45713446f3 is described below

commit 45713446f37ee4b1ee972ab8b5aa1ac0b2482197
Author: Daniel Standish <[email protected]>
AuthorDate: Sun Aug 13 05:56:01 2023 -0700

    Fix clearing behavior for downstream work task with non-collinear setup 
task (#33358)
    
    * Fix clearing behavior for downstream work task with non-collinear setup
    
    With this kind of dag if you clear w1 downstream then you also clear w2:
    
    ```python
        s1 >> w1 >> [w2, t1]
        s1 >> t1
        s2 >> t2
        s2 >> w2 >> t2
    ```
    
    We need to make sure that the setup for w2 also gets cleared.  But, to 
avoid the need to recurse to arbitrary depth for setups of setups, let's just 
say that a setup cannot have a setup.  A setup can *come after* another setup, 
but it won't *be* a setup for the setup (and what's at stake is just the 
clearing behavior).
    
    * fixup
    
    * teardowns can't have setup / teardown either
---
 airflow/models/dag.py    |  20 +++++--
 tests/models/test_dag.py | 137 ++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 150 insertions(+), 7 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 28dfca48eb..36555ce10f 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -2356,17 +2356,27 @@ class DAG(LoggingMixin):
         else:
             matched_tasks = [t for t in self.tasks if t.task_id in 
task_ids_or_regex]
 
-        also_include: list[Operator] = []
+        also_include_ids: set[str] = set()
         for t in matched_tasks:
             if include_downstream:
-                also_include.extend(t.get_flat_relatives(upstream=False))
+                for rel in t.get_flat_relatives(upstream=False):
+                    also_include_ids.add(rel.task_id)
+                    if rel not in matched_tasks:  # if it's in there, we're 
already processing it
+                        # need to include setups and teardowns for tasks that 
are in multiple
+                        # non-collinear setup/teardown paths
+                        if not rel.is_setup and not rel.is_teardown:
+                            also_include_ids.update(
+                                x.task_id for x in 
rel.get_upstreams_only_setups_and_teardowns()
+                            )
             if include_upstream:
-                also_include.extend(t.get_upstreams_follow_setups())
+                also_include_ids.update(x.task_id for x in 
t.get_upstreams_follow_setups())
             else:
-                
also_include.extend(t.get_upstreams_only_setups_and_teardowns())
+                if not t.is_setup and not t.is_teardown:
+                    also_include_ids.update(x.task_id for x in 
t.get_upstreams_only_setups_and_teardowns())
             if t.is_setup and not include_downstream:
-                also_include.extend(x for x in t.downstream_list if 
x.is_teardown)
+                also_include_ids.update(x.task_id for x in t.downstream_list 
if x.is_teardown)
 
+        also_include: list[Operator] = [self.task_dict[x] for x in 
also_include_ids]
         direct_upstreams: list[Operator] = []
         if include_direct_upstream:
             for t in itertools.chain(matched_tasks, also_include):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index cd296c0ddc..b6827fa541 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -3730,7 +3730,9 @@ class TestTaskClearingSetupTeardownBehavior:
         s2 >> w2 >> t2
 
         assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
-        assert self.cleared_downstream(w1) == {s1, w1, t1, w2, t2}
+        # s2 is included because w2 is included
+        assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2}
+        assert self.cleared_neither(w1) == {s1, w1, t1}
         assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2}
         assert self.cleared_downstream(w2) == {s2, w2, t2}
 
@@ -3750,7 +3752,9 @@ class TestTaskClearingSetupTeardownBehavior:
         s3 >> w2 >> [t3a, t3b]
         s3 >> [t3a, t3b]
         assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1a, s1b, 
t1}
-        assert self.cleared_downstream(w1) == {s1a, s1b, w1, t1, t3a, t3b, w2, 
t2}
+        # since w2 is downstream of w1, w2 gets cleared.
+        # and since w2 gets cleared, we should also see s2 and s3 in here
+        assert self.cleared_downstream(w1) == {s1a, s1b, w1, t1, s3, t3a, t3b, 
w2, s2, t2}
         assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2, 
s3, t3a, t3b}
         assert self.cleared_downstream(w2) == {s2, s3, w2, t2, t3a, t3b}
 
@@ -3897,3 +3901,132 @@ class TestTaskClearingSetupTeardownBehavior:
                 "my_setup", include_upstream=upstream, 
include_downstream=downstream
             ).tasks
         } == expected
+
+    def test_get_flat_relative_ids_two_tasks_diff_setup_teardowns_deeper(self):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, t1, s2, t2, w1, w2, s3, w3, t3 = self.make_tasks(dag, "s1, t1, 
s2, t2, w1, w2, s3, w3, t3")
+        s1 >> w1 >> t1
+        s1 >> t1
+        w1 >> w2
+
+        # with the below, s2 is not downstream of w1, but it's the setup for w2
+        # so it should be cleared when w1 is cleared
+        s2 >> w2 >> t2
+        s2 >> t2
+
+        assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
+        assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2}
+        assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2}
+        assert self.cleared_downstream(w2) == {s2, w2, t2}
+
+        # now, what if s2 itself has a setup and teardown?
+        s3 >> s2 >> t3
+        s3 >> t3
+        # note that s3 is excluded because it's assumed that a setup won't 
have a setup
+        # so, we don't continue to recurse for setups after reaching the 
setups for
+        # the downstream work tasks
+        # but, t3 is included since it's a teardown for s2
+        assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2, t3}
+
+    def test_clearing_behavior_multiple_setups_for_work_task(self):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, t1, s2, t2, w1, w2, s3, w3, t3 = self.make_tasks(dag, "s1, t1, 
s2, t2, w1, w2, s3, w3, t3")
+        s1 >> t1
+        s2 >> t2
+        s3 >> t3
+        s1 >> s2 >> s3 >> w1 >> w2 >> [t1, t2, t3]
+
+        assert self.cleared_downstream(w1) == {s1, s2, s3, w1, w2, t1, t2, t3}
+        assert self.cleared_downstream(w2) == {s1, s2, s3, w2, t1, t2, t3}
+        assert self.cleared_downstream(s3) == {s1, s2, s3, w1, w2, t1, t2, t3}
+        # even if we don't include upstream / downstream, setups and teardowns 
are cleared
+        assert self.cleared_neither(w2) == {s3, t3, s2, t2, s1, t1, w2}
+        assert self.cleared_neither(w1) == {s3, t3, s2, t2, s1, t1, w1}
+        # but, a setup doesn't formally have a setup, so if we only clear s3, 
say then its upstream setups
+        # are not also cleared
+        assert self.cleared_neither(s3) == {s3, t3}
+        assert self.cleared_neither(s2) == {s2, t2}
+
+    def test_clearing_behavior_multiple_setups_for_work_task2(self):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, t1, s2, t2, w1, w2, s3, w3, t3 = self.make_tasks(dag, "s1, t1, 
s2, t2, w1, w2, s3, w3, t3")
+        s1 >> t1
+        s2 >> t2
+        s3 >> t3
+        [s1, s2, s3] >> w1 >> w2 >> [t1, t2, t3]
+
+        assert self.cleared_downstream(w1) == {s1, s2, s3, w1, w2, t1, t2, t3}
+        assert self.cleared_downstream(w2) == {s1, s2, s3, w2, t1, t2, t3}
+
+    def test_clearing_behavior_more_tertiary_weirdness(self):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, t1, s2, t2, w1, w2, s3, t3 = self.make_tasks(dag, "s1, t1, s2, 
t2, w1, w2, s3, t3")
+        s1 >> t1
+        s2 >> t2
+        s1 >> w1 >> s2 >> w2 >> [t1, t2]
+        s2 >> w2 >> t2
+        s3 >> s2 >> t3
+        s3 >> t3
+
+        def sort(task_list):
+            return sorted(x.task_id for x in task_list)
+
+        assert set(w1.get_upstreams_only_setups_and_teardowns()) == {s1, t1}
+        # s2 is included because w2 is included
+        assert self.cleared_downstream(w1) == {s1, w1, t1, s2, w2, t2, t3}
+        assert self.cleared_downstream(w2) == {s1, t1, s2, w2, t2, t3}
+        # t3 is included since s2 is included and s2 >> t3
+        # but s3 not included because it's assumed that a setup doesn't have a 
setup
+        assert self.cleared_neither(w2) == {s1, w2, t1, s2, t2, t3}
+
+        # since we're clearing upstream, s3 is upstream of w2, so s3 and t3 
are included
+        # even though w2 doesn't require them
+        # s2 and t2 are included for obvious reasons, namely that w2 requires 
s2
+        # and s1 and t1 are included for the same reason
+        # w1 included since it is upstream of w2
+        assert sort(self.cleared_upstream(w2)) == sort({s1, t1, s2, t2, s3, 
t3, w1, w2})
+
+        # t3 is included here since it's a teardown for s2
+        assert set(w2.get_upstreams_only_setups_and_teardowns()) == {s2, t2, 
s1, t1, t3}
+
+    def test_clearing_behavior_more_tertiary_weirdness2(self):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, t1, s2, t2, w1, w2, s3, t3 = self.make_tasks(dag, "s1, t1, s2, 
t2, w1, w2, s3, t3")
+        s1 >> t1
+        s2 >> t2
+        s1 >> w1 >> t1
+        s2 >> t1 >> t2
+
+        def sort(task_list):
+            return sorted(x.task_id for x in task_list)
+
+        # t2 included since downstream, but s2 not included since it's not 
required by t2
+        # and clearing teardown does not clear the setup
+        assert self.cleared_downstream(w1) == {s1, w1, t1, t2}
+
+        # even though t1 is cleared here, s2 and t2 are not "setup and 
teardown" for t1
+        # so they are not included
+        assert self.cleared_neither(w1) == {s1, w1, t1}
+        assert self.cleared_upstream(w1) == {s1, w1, t1}
+
+        # t1 does not have a setup or teardown
+        # but t2 is downstream so it's included
+        # and s2 is not included since clearing teardown does not clear the 
setup
+        assert self.cleared_downstream(t1) == {t1, t2}
+        # t1 does not have a setup or teardown
+        assert self.cleared_neither(t1) == {t1}
+        # s2 included since upstream, and t2 included since s2 included
+        assert self.cleared_upstream(t1) == {s1, t1, s2, t2, w1}
+
+    def test_clearing_behavior_just_teardown(self):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, t1 = self.make_tasks(dag, "s1, t1")
+        s1 >> t1
+        assert set(t1.get_upstreams_only_setups_and_teardowns()) == set()
+        assert self.cleared_upstream(t1) == {s1, t1}
+        assert self.cleared_downstream(t1) == {t1}
+        assert self.cleared_neither(t1) == {t1}
+        assert set(s1.get_upstreams_only_setups_and_teardowns()) == set()
+        assert self.cleared_upstream(s1) == {s1, t1}
+        assert self.cleared_downstream(s1) == {s1, t1}
+        assert self.cleared_neither(s1) == {s1, t1}

Reply via email to