uranusjr commented on code in PR #33570: URL: https://github.com/apache/airflow/pull/33570#discussion_r1309953671
########## airflow/ti_deps/deps/trigger_rule_dep.py: ########## @@ -234,59 +296,72 @@ def _iter_upstream_conditions() -> Iterator[ColumnOperators]: .group_by(TaskInstance.task_id) ).all() upstream = sum(count for _, count in task_id_counts) - upstream_setup = sum(c for t, c in task_id_counts if upstream_tasks[t].is_setup) + if ti.task.is_teardown: + upstream_setup = sum(c for t, c in task_id_counts if upstream_tasks[t].is_setup) upstream_done = done >= upstream + setup_done = (success_setup + skipped_setup + failed_setup) >= upstream_setup changed = False new_state = None if dep_context.flag_upstream_failed: - if trigger_rule == TR.ALL_SUCCESS: - if upstream_failed or failed: - new_state = TaskInstanceState.UPSTREAM_FAILED - elif skipped: - new_state = TaskInstanceState.SKIPPED - elif removed and success and ti.map_index > -1: - if ti.map_index >= success: - new_state = TaskInstanceState.REMOVED - elif trigger_rule == TR.ALL_FAILED: - if success or skipped: - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.ONE_SUCCESS: - if upstream_done and done == skipped: - # if upstream is done and all are skipped mark as skipped - new_state = TaskInstanceState.SKIPPED - elif upstream_done and success <= 0: - # if upstream is done and there are no success mark as upstream failed - new_state = TaskInstanceState.UPSTREAM_FAILED - elif trigger_rule == TR.ONE_FAILED: - if upstream_done and not (failed or upstream_failed): - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.ONE_DONE: - if upstream_done and not (failed or success): - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.NONE_FAILED: - if upstream_failed or failed: - new_state = TaskInstanceState.UPSTREAM_FAILED - elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS: - if upstream_failed or failed: - new_state = TaskInstanceState.UPSTREAM_FAILED - elif skipped == upstream: - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.NONE_SKIPPED: - if skipped: - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.ALL_SKIPPED: - if success or failed: - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.ALL_DONE_SETUP_SUCCESS: - if upstream_done and upstream_setup and skipped_setup >= upstream_setup: - # when there is an upstream setup and they have all skipped, then skip - new_state = TaskInstanceState.SKIPPED - elif upstream_done and upstream_setup and success_setup == 0: - # when there is an upstream setup, if none succeeded, mark upstream failed - # if at least one setup ran, we'll let it run - new_state = TaskInstanceState.UPSTREAM_FAILED + if not task.is_teardown and failed_setup: + # we should exclude the teardown tasks from this check, + # because they should be run even if there is only one success setup task + new_state = TaskInstanceState.UPSTREAM_FAILED + elif not task.is_teardown and upstream_setup and setup_done and skipped_setup > 0: + # when there are upstream setup tasks and at least one of them is skipped, then skip + new_state = TaskInstanceState.SKIPPED + elif not upstream_setup or setup_done: + # if there are no upstream setup tasks or all of them are done, + # and we haven't set a new state, then we can check the upstream tasks Review Comment: I’d do ```python elif upstream_setup and not setup_down: pass ``` and dedent the entire block below. (Not sure if I got the boolean negation right) ########## airflow/ti_deps/deps/trigger_rule_dep.py: ########## @@ -234,59 +296,72 @@ def _iter_upstream_conditions() -> Iterator[ColumnOperators]: .group_by(TaskInstance.task_id) ).all() upstream = sum(count for _, count in task_id_counts) - upstream_setup = sum(c for t, c in task_id_counts if upstream_tasks[t].is_setup) + if ti.task.is_teardown: + upstream_setup = sum(c for t, c in task_id_counts if upstream_tasks[t].is_setup) upstream_done = done >= upstream + setup_done = (success_setup + skipped_setup + failed_setup) >= upstream_setup changed = False new_state = None if dep_context.flag_upstream_failed: - if trigger_rule == TR.ALL_SUCCESS: - if upstream_failed or failed: - new_state = TaskInstanceState.UPSTREAM_FAILED - elif skipped: - new_state = TaskInstanceState.SKIPPED - elif removed and success and ti.map_index > -1: - if ti.map_index >= success: - new_state = TaskInstanceState.REMOVED - elif trigger_rule == TR.ALL_FAILED: - if success or skipped: - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.ONE_SUCCESS: - if upstream_done and done == skipped: - # if upstream is done and all are skipped mark as skipped - new_state = TaskInstanceState.SKIPPED - elif upstream_done and success <= 0: - # if upstream is done and there are no success mark as upstream failed - new_state = TaskInstanceState.UPSTREAM_FAILED - elif trigger_rule == TR.ONE_FAILED: - if upstream_done and not (failed or upstream_failed): - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.ONE_DONE: - if upstream_done and not (failed or success): - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.NONE_FAILED: - if upstream_failed or failed: - new_state = TaskInstanceState.UPSTREAM_FAILED - elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS: - if upstream_failed or failed: - new_state = TaskInstanceState.UPSTREAM_FAILED - elif skipped == upstream: - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.NONE_SKIPPED: - if skipped: - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.ALL_SKIPPED: - if success or failed: - new_state = TaskInstanceState.SKIPPED - elif trigger_rule == TR.ALL_DONE_SETUP_SUCCESS: - if upstream_done and upstream_setup and skipped_setup >= upstream_setup: - # when there is an upstream setup and they have all skipped, then skip - new_state = TaskInstanceState.SKIPPED - elif upstream_done and upstream_setup and success_setup == 0: - # when there is an upstream setup, if none succeeded, mark upstream failed - # if at least one setup ran, we'll let it run - new_state = TaskInstanceState.UPSTREAM_FAILED + if not task.is_teardown and failed_setup: + # we should exclude the teardown tasks from this check, + # because they should be run even if there is only one success setup task + new_state = TaskInstanceState.UPSTREAM_FAILED + elif not task.is_teardown and upstream_setup and setup_done and skipped_setup > 0: + # when there are upstream setup tasks and at least one of them is skipped, then skip + new_state = TaskInstanceState.SKIPPED + elif not upstream_setup or setup_done: + # if there are no upstream setup tasks or all of them are done, + # and we haven't set a new state, then we can check the upstream tasks Review Comment: I’d do ```python elif upstream_setup and not setup_down: pass ``` and dedent the entire block below. (Not sure if I got the boolean negation right) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org