jedcunningham commented on code in PR #23079:
URL: https://github.com/apache/airflow/pull/23079#discussion_r856552185
##########
airflow/models/dag.py:
##########
@@ -1687,18 +1687,25 @@ def set_task_instance_state(
end_date = resolve_execution_date if not future else None
start_date = resolve_execution_date if not past else None
- subdag.clear(
- start_date=start_date,
- end_date=end_date,
- include_subdags=True,
- include_parentdag=True,
- only_failed=True,
- session=session,
- # Exclude the task itself from being cleared
- exclude_task_ids={task_id},
- )
+ # When state to be set is FAILED with downstream as True then
+ # passing only_failed will clear all failed instances just marked in
+ # downstream. Don't clear and return altered task instances.
+ # When state to be set is FAILED on an already Failed task with
downstream as False
+ # then there are no altered objects and hence we don't need to clear
them.
+ if state == TaskInstanceState.FAILED and (downstream or not altered):
+ return altered
+ else:
+ subdag.clear(
+ start_date=start_date,
+ end_date=end_date,
+ include_subdags=True,
+ include_parentdag=True,
+ only_failed=True,
+ session=session,
+ exclude_task_ids=[task_id],
+ )
- return altered
+ return altered
Review Comment:
```suggestion
return altered
```
##########
airflow/models/dag.py:
##########
@@ -1687,18 +1687,25 @@ def set_task_instance_state(
end_date = resolve_execution_date if not future else None
start_date = resolve_execution_date if not past else None
- subdag.clear(
- start_date=start_date,
- end_date=end_date,
- include_subdags=True,
- include_parentdag=True,
- only_failed=True,
- session=session,
- # Exclude the task itself from being cleared
- exclude_task_ids={task_id},
- )
+ # When state to be set is FAILED with downstream as True then
+ # passing only_failed will clear all failed instances just marked in
+ # downstream. Don't clear and return altered task instances.
+ # When state to be set is FAILED on an already Failed task with
downstream as False
+ # then there are no altered objects and hence we don't need to clear
them.
+ if state == TaskInstanceState.FAILED and (downstream or not altered):
+ return altered
+ else:
+ subdag.clear(
+ start_date=start_date,
+ end_date=end_date,
+ include_subdags=True,
+ include_parentdag=True,
+ only_failed=True,
+ session=session,
+ exclude_task_ids=[task_id],
+ )
Review Comment:
```suggestion
subdag.clear(
start_date=start_date,
end_date=end_date,
include_subdags=True,
include_parentdag=True,
only_failed=True,
session=session,
exclude_task_ids=[task_id],
)
```
We don't need the else.
Alternatively, we could inverse the if and have a single return.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]