ashb commented on a change in pull request #13037:
URL: https://github.com/apache/airflow/pull/13037#discussion_r542374344
##########
File path: airflow/www/views.py
##########
@@ -1741,16 +1742,44 @@ def _mark_task_instance_state( # pylint:
disable=too-many-arguments
from airflow.api.common.experimental.mark_tasks import set_state
if confirmed:
- altered = set_state(
- tasks=[task],
- execution_date=execution_date,
- upstream=upstream,
- downstream=downstream,
- future=future,
- past=past,
- state=state,
- commit=True,
- )
+ with create_session() as session:
+ altered = set_state(
+ tasks=[task],
+ execution_date=execution_date,
+ upstream=upstream,
+ downstream=downstream,
+ future=future,
+ past=past,
+ state=state,
+ commit=True,
+ session=session,
+ )
+
+ if state == State.SUCCESS and resume:
+ # If Resume is checked when marking success, clear
downstream tasks that
+ # are in upstream_failed state to resume them.
+
+ # Flush the session so that the tasks marked success are
reflected in the db.
+ session.flush()
+ subdag = dag.sub_dag(
+ task_ids_or_regex=fr"^{task_id}$",
+ include_downstream=True,
+ include_upstream=False,
+ )
+
+ end_date = execution_date if not future else None
+ start_date = execution_date if not past else None
+
+ subdag.clear(
+ start_date=start_date,
+ end_date=end_date,
+ include_subdags=True,
+ include_parentdag=True,
+ only_failed=True,
Review comment:
This is technically going to clear FAILED as well as UPSTREAM_FAILED.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]