OscarLigthart commented on code in PR #58576:
URL: https://github.com/apache/airflow/pull/58576#discussion_r2618812256


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -181,40 +193,24 @@ def get_dag_structure(
     dags = [latest_dag]
     for serdag in serdags:
         if serdag:
-            dags.append(serdag.dag)
+            filtered_dag = serdag.dag
+            # Apply the same filtering to historical DAG versions
+            if root:
+                filtered_dag = filtered_dag.partial_subset(
+                    task_ids=root, include_upstream=include_upstream, 
include_downstream=include_downstream
+                )
+            dags.append(filtered_dag)
     for dag in dags:
         nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(dag.task_group)]
         _merge_node_dicts(merged_nodes, nodes)
 
     # Ensure historical tasks (e.g. removed) that exist in TIs for the 
selected runs are represented
-    def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
-        ids: set[str] = set()
-        for n in nodes:
-            nid = n.get("id")
-            if nid:
-                ids.add(nid)
-            children = n.get("children")
-            if children:
-                ids |= _collect_ids(children)  # recurse
-        return ids
-
-    existing_ids = _collect_ids(merged_nodes)
-    historical_tasks = session.execute(
-        select(TaskInstance.task_id, TaskInstance.task_display_name)
-        .join(TaskInstance.dag_run)
-        .where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
-        .distinct()
-    )
-    for task_id, task_display_name in historical_tasks:
-        if task_id not in existing_ids:
-            merged_nodes.append(
-                {
-                    "id": task_id,
-                    "label": task_display_name,
-                    "is_mapped": None,
-                    "children": None,
-                }
-            )
+    # Only add in case no filter is applied
+    if not root:
+        historical_nodes = collect_historical_tasks(
+            nodes=merged_nodes, dag_id=dag_id, run_ids=run_ids, session=session
+        )
+        _merge_node_dicts(merged_nodes, historical_nodes)

Review Comment:
   @pierrejeambrun removed the code to get historical tasks a second time, and 
added a historical task to my test cases. Behaviour is as expected, so I 
believe you were right and this code is redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to