ephraimbuddy commented on code in PR #31658:
URL: https://github.com/apache/airflow/pull/31658#discussion_r1213638110


##########
tests/models/test_dagrun.py:
##########
@@ -2469,3 +2470,71 @@ def mytask():
     session.flush()
     dr = session.query(DagRun).one()
     assert dr.state == DagRunState.FAILED
+
+
[email protected](
+    "input, expected",
+    [
+        (["s1 >> w1 >> t1"], {"w1"}),  # t1 ignored
+        (["s1 >> w1 >> t1", "s1 >> t1"], {"w1"}),  # t1 ignored; properly 
wired to setup
+        (["s1 >> w1"], {"w1"}),  # no teardown
+        (["s1 >> w1 >> t1_"], {"t1_"}),  # t1_ is natural leaf and OFFD=True;
+        (["s1 >> w1 >> t1_", "s1 >> t1_"], {"t1_"}),  # t1_ is natural leaf 
and OFFD=True; wired to setup
+        (["s1 >> w1 >> t1_ >> w2", "s1 >> t1_"], {"w2"}),  # t1_ is not a 
natural leaf so excluded anyway
+    ],
+)
+def test_tis_considered_for_state(dag_maker, session, input, expected):
+    """
+    We use a convenience notation to wire up test scenarios:
+
+    t<num> -- teardown task
+    t<num>_ -- teardown task with on_failure_fail_dagrun = True
+    s<num>_ -- setup task
+    w<num> -- work task (a.k.a. normal task)
+
+    In the test input, each line is a statement. We'll automatically create 
the tasks and wire them up
+    as indicated in the test input.
+    """
+
+    @teardown()
+    def teardown_task():
+        print(1)
+
+    # todo: should not have to do this; should be able to use override

Review Comment:
   I have made a PR to fix this: https://github.com/apache/airflow/pull/31665



##########
airflow/models/dagrun.py:
##########
@@ -533,6 +533,28 @@ def get_previous_scheduled_dagrun(self, session: Session = 
NEW_SESSION) -> DagRu
             .first()
         )
 
+    def _tis_for_dagrun_state(self, *, dag, tis):
+        """
+        Return the collection of tasks that should be considered for 
evaluation of terminal dag run state.
+
+        Teardown tasks by default are not considered for the purpose of dag 
run state.  But
+        users may enable such consideration with on_failure_fail_dagrun.
+        """
+
+        def is_effective_leaf(task):
+            for down_task_id in task.downstream_task_ids:
+                down_task = dag.get_task(down_task_id)
+                if not down_task.is_teardown or 
down_task.on_failure_fail_dagrun is True:

Review Comment:
   ```suggestion
                   if not down_task.is_teardown or 
down_task.on_failure_fail_dagrun:
   ```



##########
airflow/models/dagrun.py:
##########
@@ -533,6 +533,28 @@ def get_previous_scheduled_dagrun(self, session: Session = 
NEW_SESSION) -> DagRu
             .first()
         )
 
+    def _tis_for_dagrun_state(self, *, dag, tis):
+        """
+        Return the collection of tasks that should be considered for 
evaluation of terminal dag run state.
+
+        Teardown tasks by default are not considered for the purpose of dag 
run state.  But
+        users may enable such consideration with on_failure_fail_dagrun.
+        """
+
+        def is_effective_leaf(task):
+            for down_task_id in task.downstream_task_ids:
+                down_task = dag.get_task(down_task_id)
+                if not down_task.is_teardown or 
down_task.on_failure_fail_dagrun is True:
+                    # we found a down task that is not ignorable; not a leaf
+                    return False
+            # we found no ignorable downstreams
+            # evaluate whether task is itself ignorable
+            return not task.is_teardown or task.on_failure_fail_dagrun is True

Review Comment:
   ```suggestion
               return not task.is_teardown or task.on_failure_fail_dagrun
   ```
   



##########
airflow/models/dagrun.py:
##########
@@ -1248,7 +1259,7 @@ def get_run(session: Session, dag_id: str, 
execution_date: datetime) -> DagRun |
             session.query(DagRun)
             .filter(
                 DagRun.dag_id == dag_id,
-                DagRun.external_trigger == False,  # noqa
+                DagRun.external_trigger is False,

Review Comment:
   Interesting that `is` works



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to