jedcunningham commented on code in PR #30271:
URL: https://github.com/apache/airflow/pull/30271#discussion_r1231601592


##########
tests/models/test_dag.py:
##########
@@ -3524,3 +3519,259 @@ def 
test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: DagR
     assert str(ctx.value) == (
         f"A manual DAG run cannot use ID {run_id!r} since it is reserved for 
{run_id_type.value} runs"
     )
+
+
+class TestTaskClearingSetupTeardownBehavior:
+    """
+    Task clearing behavior is mainly controlled by dag.partial_subset.
+    Here we verify, primarily with regard to setups and teardowns, the
+    behavior of dag.partial_subset but also the supporting methods defined
+    on AbstractOperator.
+    """
+
+    @staticmethod
+    def make_tasks(dag, input_str):
+        """
+        Helper for building setup and teardown tasks for testing.
+
+        Given an input such as 's1, w1, t1, tf1', returns setup task "s1", 
normal task "w1"
+        (the w means *work*), teardown task "t1", and teardown task "tf1" 
where the f means
+        on_failure_fail_dagrun has been set to true.
+        """
+
+        def teardown_task(task_id):
+            return BaseOperator.as_teardown(task_id=task_id)
+
+        def teardown_task_f(task_id):
+            return BaseOperator.as_teardown(task_id=task_id, 
on_failure_fail_dagrun=True)
+
+        def work_task(task_id):
+            return BaseOperator(task_id=task_id)
+
+        def setup_task(task_id):
+            return BaseOperator.as_setup(task_id=task_id)
+
+        def make_task(task_id):
+            """
+            Task factory helper.
+
+            Will give a setup, teardown, work, or teardown-with-dagrun-failure 
task depending on input.
+            """
+            if task_id.startswith("s"):
+                factory = setup_task
+            elif task_id.startswith("w"):
+                factory = work_task
+            elif task_id.startswith("tf"):
+                factory = teardown_task_f
+            elif task_id.startswith("t"):
+                factory = teardown_task
+            else:
+                raise ValueError("unexpected")
+            return dag.task_dict.get(task_id) or factory(task_id=task_id)
+
+        return (make_task(x) for x in input_str.split(", "))
+
+    @staticmethod
+    def cleared_downstream(task):
+        """Helper to return tasks that would be cleared if **downstream** 
selected."""
+        upstream = False
+        return set(
+            task.dag.partial_subset(
+                task_ids_or_regex=[task.task_id],
+                include_downstream=not upstream,
+                include_upstream=upstream,
+            ).tasks
+        )
+
+    @staticmethod
+    def cleared_upstream(task):
+        """Helper to return tasks that would be cleared if **upstream** 
selected."""
+        upstream = True
+        return set(
+            task.dag.partial_subset(
+                task_ids_or_regex=[task.task_id],
+                include_downstream=not upstream,
+                include_upstream=upstream,
+            ).tasks
+        )
+
+    def test_get_flat_relative_ids_with_setup(self):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, w1, w2, w3, t1 = self.make_tasks(dag, "s1, w1, w2, w3, t1")
+
+        s1 >> w1 >> w2 >> w3
+
+        # there is no teardown downstream of w2, so we assume w2 does not need 
s1
+        assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()
+        assert self.cleared_downstream(w2) == {w2, w3}
+
+        w3 >> t1
+
+        # now, w2 has a downstream teardown, but it's not connected directly 
to s1
+        # (this is how we signal "this is the teardown for this setup")
+        # so still, we don't regard s1 as a setup for w2
+        assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()
+        assert self.cleared_downstream(w2) == {w2, w3, t1}

Review Comment:
   And probably worth checking that w1 now has both s1 and t1.



##########
tests/models/test_dag.py:
##########
@@ -3524,3 +3519,259 @@ def 
test_create_dagrun_disallow_manual_to_use_automated_run_id(run_id_type: DagR
     assert str(ctx.value) == (
         f"A manual DAG run cannot use ID {run_id!r} since it is reserved for 
{run_id_type.value} runs"
     )
+
+
+class TestTaskClearingSetupTeardownBehavior:
+    """
+    Task clearing behavior is mainly controlled by dag.partial_subset.
+    Here we verify, primarily with regard to setups and teardowns, the
+    behavior of dag.partial_subset but also the supporting methods defined
+    on AbstractOperator.
+    """
+
+    @staticmethod
+    def make_tasks(dag, input_str):
+        """
+        Helper for building setup and teardown tasks for testing.
+
+        Given an input such as 's1, w1, t1, tf1', returns setup task "s1", 
normal task "w1"
+        (the w means *work*), teardown task "t1", and teardown task "tf1" 
where the f means
+        on_failure_fail_dagrun has been set to true.
+        """
+
+        def teardown_task(task_id):
+            return BaseOperator.as_teardown(task_id=task_id)
+
+        def teardown_task_f(task_id):
+            return BaseOperator.as_teardown(task_id=task_id, 
on_failure_fail_dagrun=True)
+
+        def work_task(task_id):
+            return BaseOperator(task_id=task_id)
+
+        def setup_task(task_id):
+            return BaseOperator.as_setup(task_id=task_id)
+
+        def make_task(task_id):
+            """
+            Task factory helper.
+
+            Will give a setup, teardown, work, or teardown-with-dagrun-failure 
task depending on input.
+            """
+            if task_id.startswith("s"):
+                factory = setup_task
+            elif task_id.startswith("w"):
+                factory = work_task
+            elif task_id.startswith("tf"):
+                factory = teardown_task_f
+            elif task_id.startswith("t"):
+                factory = teardown_task
+            else:
+                raise ValueError("unexpected")
+            return dag.task_dict.get(task_id) or factory(task_id=task_id)
+
+        return (make_task(x) for x in input_str.split(", "))
+
+    @staticmethod
+    def cleared_downstream(task):
+        """Helper to return tasks that would be cleared if **downstream** 
selected."""
+        upstream = False
+        return set(
+            task.dag.partial_subset(
+                task_ids_or_regex=[task.task_id],
+                include_downstream=not upstream,
+                include_upstream=upstream,
+            ).tasks
+        )
+
+    @staticmethod
+    def cleared_upstream(task):
+        """Helper to return tasks that would be cleared if **upstream** 
selected."""
+        upstream = True
+        return set(
+            task.dag.partial_subset(
+                task_ids_or_regex=[task.task_id],
+                include_downstream=not upstream,
+                include_upstream=upstream,
+            ).tasks
+        )
+
+    def test_get_flat_relative_ids_with_setup(self):
+        with DAG(dag_id="test_dag", start_date=pendulum.now()) as dag:
+            s1, w1, w2, w3, t1 = self.make_tasks(dag, "s1, w1, w2, w3, t1")
+
+        s1 >> w1 >> w2 >> w3
+
+        # there is no teardown downstream of w2, so we assume w2 does not need 
s1
+        assert set(w2.get_upstreams_only_setups_and_teardowns()) == set()

Review Comment:
   I feel like we should check that w1 does consider s1 a setup here before we 
add any other relationships.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to