pierrejeambrun commented on code in PR #58576:
URL: https://github.com/apache/airflow/pull/58576#discussion_r2607223839
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -181,40 +193,24 @@ def get_dag_structure(
dags = [latest_dag]
for serdag in serdags:
if serdag:
- dags.append(serdag.dag)
+ filtered_dag = serdag.dag
+ # Apply the same filtering to historical DAG versions
+ if root:
+ filtered_dag = filtered_dag.partial_subset(
+ task_ids=root, include_upstream=include_upstream,
include_downstream=include_downstream
+ )
+ dags.append(filtered_dag)
for dag in dags:
nodes = [task_group_to_dict_grid(x) for x in
task_group_sort(dag.task_group)]
_merge_node_dicts(merged_nodes, nodes)
# Ensure historical tasks (e.g. removed) that exist in TIs for the
selected runs are represented
- def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
- ids: set[str] = set()
- for n in nodes:
- nid = n.get("id")
- if nid:
- ids.add(nid)
- children = n.get("children")
- if children:
- ids |= _collect_ids(children) # recurse
- return ids
-
- existing_ids = _collect_ids(merged_nodes)
- historical_tasks = session.execute(
- select(TaskInstance.task_id, TaskInstance.task_display_name)
- .join(TaskInstance.dag_run)
- .where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
- .distinct()
- )
- for task_id, task_display_name in historical_tasks:
- if task_id not in existing_ids:
- merged_nodes.append(
- {
- "id": task_id,
- "label": task_display_name,
- "is_mapped": None,
- "children": None,
- }
- )
+ # Only add in case no filter is applied
+ if not root:
+ historical_nodes = collect_historical_tasks(
+ nodes=merged_nodes, dag_id=dag_id, run_ids=run_ids, session=session
+ )
+ _merge_node_dicts(merged_nodes, historical_nodes)
Review Comment:
I don't think we should do that. Historical Tasks should probably appear as
downstream and upstream of previous runs even if the structure changed by then.
(they used to be downstream and upstream at the time of the run).
Can we also have a test for this specific use case? (historical tasks +
downstream / upstream filtering).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]