uranusjr commented on code in PR #33570:
URL: https://github.com/apache/airflow/pull/33570#discussion_r1309953671
##########
airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -234,59 +296,72 @@ def _iter_upstream_conditions() ->
Iterator[ColumnOperators]:
.group_by(TaskInstance.task_id)
).all()
upstream = sum(count for _, count in task_id_counts)
- upstream_setup = sum(c for t, c in task_id_counts if
upstream_tasks[t].is_setup)
+ if ti.task.is_teardown:
+ upstream_setup = sum(c for t, c in task_id_counts if
upstream_tasks[t].is_setup)
upstream_done = done >= upstream
+ setup_done = (success_setup + skipped_setup + failed_setup) >=
upstream_setup
changed = False
new_state = None
if dep_context.flag_upstream_failed:
- if trigger_rule == TR.ALL_SUCCESS:
- if upstream_failed or failed:
- new_state = TaskInstanceState.UPSTREAM_FAILED
- elif skipped:
- new_state = TaskInstanceState.SKIPPED
- elif removed and success and ti.map_index > -1:
- if ti.map_index >= success:
- new_state = TaskInstanceState.REMOVED
- elif trigger_rule == TR.ALL_FAILED:
- if success or skipped:
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.ONE_SUCCESS:
- if upstream_done and done == skipped:
- # if upstream is done and all are skipped mark as skipped
- new_state = TaskInstanceState.SKIPPED
- elif upstream_done and success <= 0:
- # if upstream is done and there are no success mark as
upstream failed
- new_state = TaskInstanceState.UPSTREAM_FAILED
- elif trigger_rule == TR.ONE_FAILED:
- if upstream_done and not (failed or upstream_failed):
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.ONE_DONE:
- if upstream_done and not (failed or success):
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.NONE_FAILED:
- if upstream_failed or failed:
- new_state = TaskInstanceState.UPSTREAM_FAILED
- elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
- if upstream_failed or failed:
- new_state = TaskInstanceState.UPSTREAM_FAILED
- elif skipped == upstream:
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.NONE_SKIPPED:
- if skipped:
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.ALL_SKIPPED:
- if success or failed:
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.ALL_DONE_SETUP_SUCCESS:
- if upstream_done and upstream_setup and skipped_setup >=
upstream_setup:
- # when there is an upstream setup and they have all
skipped, then skip
- new_state = TaskInstanceState.SKIPPED
- elif upstream_done and upstream_setup and success_setup == 0:
- # when there is an upstream setup, if none succeeded, mark
upstream failed
- # if at least one setup ran, we'll let it run
- new_state = TaskInstanceState.UPSTREAM_FAILED
+ if not task.is_teardown and failed_setup:
+ # we should exclude the teardown tasks from this check,
+ # because they should be run even if there is only one success
setup task
+ new_state = TaskInstanceState.UPSTREAM_FAILED
+ elif not task.is_teardown and upstream_setup and setup_done and
skipped_setup > 0:
+ # when there are upstream setup tasks and at least one of them
is skipped, then skip
+ new_state = TaskInstanceState.SKIPPED
+ elif not upstream_setup or setup_done:
+ # if there are no upstream setup tasks or all of them are done,
+ # and we haven't set a new state, then we can check the
upstream tasks
Review Comment:
I’d do
```python
elif upstream_setup and not setup_down:
pass
```
and dedent the entire block below. (Not sure if I got the boolean negation
right)
##########
airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -234,59 +296,72 @@ def _iter_upstream_conditions() ->
Iterator[ColumnOperators]:
.group_by(TaskInstance.task_id)
).all()
upstream = sum(count for _, count in task_id_counts)
- upstream_setup = sum(c for t, c in task_id_counts if
upstream_tasks[t].is_setup)
+ if ti.task.is_teardown:
+ upstream_setup = sum(c for t, c in task_id_counts if
upstream_tasks[t].is_setup)
upstream_done = done >= upstream
+ setup_done = (success_setup + skipped_setup + failed_setup) >=
upstream_setup
changed = False
new_state = None
if dep_context.flag_upstream_failed:
- if trigger_rule == TR.ALL_SUCCESS:
- if upstream_failed or failed:
- new_state = TaskInstanceState.UPSTREAM_FAILED
- elif skipped:
- new_state = TaskInstanceState.SKIPPED
- elif removed and success and ti.map_index > -1:
- if ti.map_index >= success:
- new_state = TaskInstanceState.REMOVED
- elif trigger_rule == TR.ALL_FAILED:
- if success or skipped:
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.ONE_SUCCESS:
- if upstream_done and done == skipped:
- # if upstream is done and all are skipped mark as skipped
- new_state = TaskInstanceState.SKIPPED
- elif upstream_done and success <= 0:
- # if upstream is done and there are no success mark as
upstream failed
- new_state = TaskInstanceState.UPSTREAM_FAILED
- elif trigger_rule == TR.ONE_FAILED:
- if upstream_done and not (failed or upstream_failed):
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.ONE_DONE:
- if upstream_done and not (failed or success):
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.NONE_FAILED:
- if upstream_failed or failed:
- new_state = TaskInstanceState.UPSTREAM_FAILED
- elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
- if upstream_failed or failed:
- new_state = TaskInstanceState.UPSTREAM_FAILED
- elif skipped == upstream:
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.NONE_SKIPPED:
- if skipped:
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.ALL_SKIPPED:
- if success or failed:
- new_state = TaskInstanceState.SKIPPED
- elif trigger_rule == TR.ALL_DONE_SETUP_SUCCESS:
- if upstream_done and upstream_setup and skipped_setup >=
upstream_setup:
- # when there is an upstream setup and they have all
skipped, then skip
- new_state = TaskInstanceState.SKIPPED
- elif upstream_done and upstream_setup and success_setup == 0:
- # when there is an upstream setup, if none succeeded, mark
upstream failed
- # if at least one setup ran, we'll let it run
- new_state = TaskInstanceState.UPSTREAM_FAILED
+ if not task.is_teardown and failed_setup:
+ # we should exclude the teardown tasks from this check,
+ # because they should be run even if there is only one success
setup task
+ new_state = TaskInstanceState.UPSTREAM_FAILED
+ elif not task.is_teardown and upstream_setup and setup_done and
skipped_setup > 0:
+ # when there are upstream setup tasks and at least one of them
is skipped, then skip
+ new_state = TaskInstanceState.SKIPPED
+ elif not upstream_setup or setup_done:
+ # if there are no upstream setup tasks or all of them are done,
+ # and we haven't set a new state, then we can check the
upstream tasks
Review Comment:
I’d do
```python
elif upstream_setup and not setup_down:
pass
```
and dedent the entire block below. (Not sure if I got the boolean negation
right)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]