uranusjr commented on code in PR #33570:
URL: https://github.com/apache/airflow/pull/33570#discussion_r1309953671


##########
airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -234,59 +296,72 @@ def _iter_upstream_conditions() -> 
Iterator[ColumnOperators]:
                 .group_by(TaskInstance.task_id)
             ).all()
             upstream = sum(count for _, count in task_id_counts)
-            upstream_setup = sum(c for t, c in task_id_counts if 
upstream_tasks[t].is_setup)
+            if ti.task.is_teardown:
+                upstream_setup = sum(c for t, c in task_id_counts if 
upstream_tasks[t].is_setup)
 
         upstream_done = done >= upstream
+        setup_done = (success_setup + skipped_setup + failed_setup) >= 
upstream_setup
 
         changed = False
         new_state = None
         if dep_context.flag_upstream_failed:
-            if trigger_rule == TR.ALL_SUCCESS:
-                if upstream_failed or failed:
-                    new_state = TaskInstanceState.UPSTREAM_FAILED
-                elif skipped:
-                    new_state = TaskInstanceState.SKIPPED
-                elif removed and success and ti.map_index > -1:
-                    if ti.map_index >= success:
-                        new_state = TaskInstanceState.REMOVED
-            elif trigger_rule == TR.ALL_FAILED:
-                if success or skipped:
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.ONE_SUCCESS:
-                if upstream_done and done == skipped:
-                    # if upstream is done and all are skipped mark as skipped
-                    new_state = TaskInstanceState.SKIPPED
-                elif upstream_done and success <= 0:
-                    # if upstream is done and there are no success mark as 
upstream failed
-                    new_state = TaskInstanceState.UPSTREAM_FAILED
-            elif trigger_rule == TR.ONE_FAILED:
-                if upstream_done and not (failed or upstream_failed):
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.ONE_DONE:
-                if upstream_done and not (failed or success):
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.NONE_FAILED:
-                if upstream_failed or failed:
-                    new_state = TaskInstanceState.UPSTREAM_FAILED
-            elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
-                if upstream_failed or failed:
-                    new_state = TaskInstanceState.UPSTREAM_FAILED
-                elif skipped == upstream:
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.NONE_SKIPPED:
-                if skipped:
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.ALL_SKIPPED:
-                if success or failed:
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.ALL_DONE_SETUP_SUCCESS:
-                if upstream_done and upstream_setup and skipped_setup >= 
upstream_setup:
-                    # when there is an upstream setup and they have all 
skipped, then skip
-                    new_state = TaskInstanceState.SKIPPED
-                elif upstream_done and upstream_setup and success_setup == 0:
-                    # when there is an upstream setup, if none succeeded, mark 
upstream failed
-                    # if at least one setup ran, we'll let it run
-                    new_state = TaskInstanceState.UPSTREAM_FAILED
+            if not task.is_teardown and failed_setup:
+                # we should exclude the teardown tasks from this check,
+                # because they should be run even if there is only one success 
setup task
+                new_state = TaskInstanceState.UPSTREAM_FAILED
+            elif not task.is_teardown and upstream_setup and setup_done and 
skipped_setup > 0:
+                # when there are upstream setup tasks and at least one of them 
is skipped, then skip
+                new_state = TaskInstanceState.SKIPPED
+            elif not upstream_setup or setup_done:
+                # if there are no upstream setup tasks or all of them are done,
+                # and we haven't set a new state, then we can check the 
upstream tasks

Review Comment:
   I’d do
   
   ```python
   elif upstream_setup and not setup_down:
       pass
   ```
   
   and dedent the entire block below. (Not sure if I got the boolean negation 
right)



##########
airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -234,59 +296,72 @@ def _iter_upstream_conditions() -> 
Iterator[ColumnOperators]:
                 .group_by(TaskInstance.task_id)
             ).all()
             upstream = sum(count for _, count in task_id_counts)
-            upstream_setup = sum(c for t, c in task_id_counts if 
upstream_tasks[t].is_setup)
+            if ti.task.is_teardown:
+                upstream_setup = sum(c for t, c in task_id_counts if 
upstream_tasks[t].is_setup)
 
         upstream_done = done >= upstream
+        setup_done = (success_setup + skipped_setup + failed_setup) >= 
upstream_setup
 
         changed = False
         new_state = None
         if dep_context.flag_upstream_failed:
-            if trigger_rule == TR.ALL_SUCCESS:
-                if upstream_failed or failed:
-                    new_state = TaskInstanceState.UPSTREAM_FAILED
-                elif skipped:
-                    new_state = TaskInstanceState.SKIPPED
-                elif removed and success and ti.map_index > -1:
-                    if ti.map_index >= success:
-                        new_state = TaskInstanceState.REMOVED
-            elif trigger_rule == TR.ALL_FAILED:
-                if success or skipped:
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.ONE_SUCCESS:
-                if upstream_done and done == skipped:
-                    # if upstream is done and all are skipped mark as skipped
-                    new_state = TaskInstanceState.SKIPPED
-                elif upstream_done and success <= 0:
-                    # if upstream is done and there are no success mark as 
upstream failed
-                    new_state = TaskInstanceState.UPSTREAM_FAILED
-            elif trigger_rule == TR.ONE_FAILED:
-                if upstream_done and not (failed or upstream_failed):
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.ONE_DONE:
-                if upstream_done and not (failed or success):
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.NONE_FAILED:
-                if upstream_failed or failed:
-                    new_state = TaskInstanceState.UPSTREAM_FAILED
-            elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
-                if upstream_failed or failed:
-                    new_state = TaskInstanceState.UPSTREAM_FAILED
-                elif skipped == upstream:
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.NONE_SKIPPED:
-                if skipped:
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.ALL_SKIPPED:
-                if success or failed:
-                    new_state = TaskInstanceState.SKIPPED
-            elif trigger_rule == TR.ALL_DONE_SETUP_SUCCESS:
-                if upstream_done and upstream_setup and skipped_setup >= 
upstream_setup:
-                    # when there is an upstream setup and they have all 
skipped, then skip
-                    new_state = TaskInstanceState.SKIPPED
-                elif upstream_done and upstream_setup and success_setup == 0:
-                    # when there is an upstream setup, if none succeeded, mark 
upstream failed
-                    # if at least one setup ran, we'll let it run
-                    new_state = TaskInstanceState.UPSTREAM_FAILED
+            if not task.is_teardown and failed_setup:
+                # we should exclude the teardown tasks from this check,
+                # because they should be run even if there is only one success 
setup task
+                new_state = TaskInstanceState.UPSTREAM_FAILED
+            elif not task.is_teardown and upstream_setup and setup_done and 
skipped_setup > 0:
+                # when there are upstream setup tasks and at least one of them 
is skipped, then skip
+                new_state = TaskInstanceState.SKIPPED
+            elif not upstream_setup or setup_done:
+                # if there are no upstream setup tasks or all of them are done,
+                # and we haven't set a new state, then we can check the 
upstream tasks

Review Comment:
   I’d do
   
   ```python
   elif upstream_setup and not setup_down:
       pass
   ```
   
   and dedent the entire block below. (Not sure if I got the boolean negation 
right)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to