pierrejeambrun commented on code in PR #58576:
URL: https://github.com/apache/airflow/pull/58576#discussion_r2607291772


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -181,40 +193,24 @@ def get_dag_structure(
     dags = [latest_dag]
     for serdag in serdags:
         if serdag:
-            dags.append(serdag.dag)
+            filtered_dag = serdag.dag
+            # Apply the same filtering to historical DAG versions
+            if root:
+                filtered_dag = filtered_dag.partial_subset(
+                    task_ids=root, include_upstream=include_upstream, 
include_downstream=include_downstream
+                )
+            dags.append(filtered_dag)
     for dag in dags:
         nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(dag.task_group)]
         _merge_node_dicts(merged_nodes, nodes)
 
     # Ensure historical tasks (e.g. removed) that exist in TIs for the 
selected runs are represented
-    def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
-        ids: set[str] = set()
-        for n in nodes:
-            nid = n.get("id")
-            if nid:
-                ids.add(nid)
-            children = n.get("children")
-            if children:
-                ids |= _collect_ids(children)  # recurse
-        return ids
-
-    existing_ids = _collect_ids(merged_nodes)
-    historical_tasks = session.execute(
-        select(TaskInstance.task_id, TaskInstance.task_display_name)
-        .join(TaskInstance.dag_run)
-        .where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
-        .distinct()
-    )
-    for task_id, task_display_name in historical_tasks:
-        if task_id not in existing_ids:
-            merged_nodes.append(
-                {
-                    "id": task_id,
-                    "label": task_display_name,
-                    "is_mapped": None,
-                    "children": None,
-                }
-            )
+    # Only add in case no filter is applied
+    if not root:
+        historical_nodes = collect_historical_tasks(
+            nodes=merged_nodes, dag_id=dag_id, run_ids=run_ids, session=session
+        )
+        _merge_node_dicts(merged_nodes, historical_nodes)

Review Comment:
   I don't understand why we need this in the first place. We do iterate over 
the serialized dags, those should have the 'historical dependency'.
   
   I just tried removing completely this piece of code 
   ```
       # Ensure historical tasks (e.g. removed) that exist in TIs for the 
selected runs are represented
       # def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
       #     ids: set[str] = set()
       #     for n in nodes:
       #         nid = n.get("id")
       #         if nid:
       #             ids.add(nid)
       #         children = n.get("children")
       #         if children:
       #             ids |= _collect_ids(children)  # recurse
       #     return ids
   
       # existing_ids = _collect_ids(merged_nodes)
       # historical_tasks = session.execute(
       #     select(TaskInstance.task_id, TaskInstance.task_display_name)
       #     .join(TaskInstance.dag_run)
       #     .where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
       #     .distinct()
       # )
       # for task_id, task_display_name in historical_tasks:
       #     if task_id not in existing_ids:
       #         merged_nodes.append(
       #             {
       #                 "id": task_id,
       #                 "label": task_display_name,
       #                 "is_mapped": None,
       #                 "children": None,
       #             }
       #         )
   ```
   And tests are still green. 🤔 do we still need that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to