ephraimbuddy commented on code in PR #58987:
URL: https://github.com/apache/airflow/pull/58987#discussion_r2583863865
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -729,32 +729,54 @@ def post_clear_task_instances(
if future:
body.end_date = None
- task_ids = body.task_ids
- if task_ids is not None:
- tasks = set(task_ids)
- mapped_tasks_tuples = set(t for t in tasks if isinstance(t, tuple))
+ if (task_markers_to_clear := body.task_ids) is not None:
+ mapped_tasks_tuples = {t for t in task_markers_to_clear if
isinstance(t, tuple)}
# Unmapped tasks are expressed in their task_ids (without map_indexes)
- unmapped_task_ids = set(t for t in tasks if not isinstance(t, tuple))
-
- if upstream or downstream:
- mapped_task_ids = set(tid for tid, _ in mapped_tasks_tuples)
- relatives = dag.partial_subset(
- task_ids=unmapped_task_ids | mapped_task_ids,
- include_downstream=downstream,
- include_upstream=upstream,
- exclude_original=True,
- )
- unmapped_task_ids = unmapped_task_ids |
set(relatives.task_dict.keys())
+ normal_task_ids = {t for t in task_markers_to_clear if not
isinstance(t, tuple)}
+
+ def _collect_relatives(run_id: str, direction: Literal["upstream",
"downstream"]) -> None:
+ from airflow.models.taskinstance import find_relevant_relatives
- mapped_tasks_list = [
- (tid, map_id) for tid, map_id in mapped_tasks_tuples if tid not in
unmapped_task_ids
+ relevant_relatives = find_relevant_relatives(
+ normal_task_ids,
+ mapped_tasks_tuples,
+ dag=dag,
+ run_id=run_id,
+ direction=direction,
+ session=session,
+ )
+ normal_task_ids.update(t for t in relevant_relatives if not
isinstance(t, tuple))
+ mapped_tasks_tuples.update(t for t in relevant_relatives if
isinstance(t, tuple))
+
+ # We can't easily calculate upstream/downstream map indexes when not
+ # working for a specific dag run. It's possible by looking at the runs
+ # one by one, but that is both resource-consuming and logically
complex.
+ # So instead we'll just clear all the tis based on task ID and hope
+ # that's good enough for most cases.
+ if dag_run_id is None:
+ if upstream or downstream:
+ partial_dag = dag.partial_subset(
+ task_ids=normal_task_ids.union(tid for tid, _ in
mapped_tasks_tuples),
+ include_downstream=downstream,
+ include_upstream=upstream,
+ exclude_original=True,
+ )
+ normal_task_ids.update(partial_dag.task_dict)
+ else:
+ if upstream:
+ _collect_relatives(dag_run_id, "upstream")
+ if downstream:
+ _collect_relatives(dag_run_id, "downstream")
Review Comment:
let's merge since it's a backport
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]