pierrejeambrun commented on code in PR #58576:
URL: https://github.com/apache/airflow/pull/58576#discussion_r2607223839


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -181,40 +193,24 @@ def get_dag_structure(
     dags = [latest_dag]
     for serdag in serdags:
         if serdag:
-            dags.append(serdag.dag)
+            filtered_dag = serdag.dag
+            # Apply the same filtering to historical DAG versions
+            if root:
+                filtered_dag = filtered_dag.partial_subset(
+                    task_ids=root, include_upstream=include_upstream, 
include_downstream=include_downstream
+                )
+            dags.append(filtered_dag)
     for dag in dags:
         nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(dag.task_group)]
         _merge_node_dicts(merged_nodes, nodes)
 
     # Ensure historical tasks (e.g. removed) that exist in TIs for the 
selected runs are represented
-    def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
-        ids: set[str] = set()
-        for n in nodes:
-            nid = n.get("id")
-            if nid:
-                ids.add(nid)
-            children = n.get("children")
-            if children:
-                ids |= _collect_ids(children)  # recurse
-        return ids
-
-    existing_ids = _collect_ids(merged_nodes)
-    historical_tasks = session.execute(
-        select(TaskInstance.task_id, TaskInstance.task_display_name)
-        .join(TaskInstance.dag_run)
-        .where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
-        .distinct()
-    )
-    for task_id, task_display_name in historical_tasks:
-        if task_id not in existing_ids:
-            merged_nodes.append(
-                {
-                    "id": task_id,
-                    "label": task_display_name,
-                    "is_mapped": None,
-                    "children": None,
-                }
-            )
+    # Only add in case no filter is applied
+    if not root:
+        historical_nodes = collect_historical_tasks(
+            nodes=merged_nodes, dag_id=dag_id, run_ids=run_ids, session=session
+        )
+        _merge_node_dicts(merged_nodes, historical_nodes)

Review Comment:
   Historical Tasks should probably appear as downstream and upstream of 
previous runs even if the structure changed by then. (they used to be 
downstream and upstream at the time of the run). What do you think ?
   
   This means that the upstream / downstream 'tasks' will be missing from the 
grid, when we select an older dag run with a different structure. That's 
probably not what we want. 
   
   Can we also have a test for this specific use case? (historical tasks + 
downstream / upstream filtering).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to