pierrejeambrun commented on code in PR #48477:
URL: https://github.com/apache/airflow/pull/48477#discussion_r2018266941


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py:
##########
@@ -146,33 +153,36 @@ def grid_data(
             )
         )
 
-    task_node_map = get_task_group_map(dag=dag)
     # Group the Task Instances by Parent Task (TaskGroup or Mapped) and All 
Task Instances
     parent_tis: dict[tuple[str, str], list] = collections.defaultdict(list)
     all_tis: dict[tuple[str, str], list] = collections.defaultdict(list)
-    for ti in task_instances:
-        # Skip the Task Instances if upstream/downstream filtering is applied 
or if task is deleted
-        if (
-            task_node_map_exclude and ti.task_id not in 
task_node_map_exclude.keys()
-        ) or ti.task_id not in task_node_map.keys():
-            continue
-
-        # Populate the Grouped Task Instances (All Task Instances except the 
Parent Task Instances)
-        if ti.task_id in get_child_task_map(
-            parent_task_id=task_node_map[ti.task_id]["parent_id"], 
task_node_map=task_node_map
-        ):
-            all_tis[(ti.task_id, ti.run_id)].append(ti)
-        # Populate the Parent Task Instances
-        parent_id = task_node_map[ti.task_id]["parent_id"]
-        if not parent_id and task_node_map[ti.task_id]["is_group"]:
-            parent_tis[(ti.task_id, ti.run_id)].append(ti)
-        elif parent_id and task_node_map[parent_id]["is_group"]:
-            parent_tis[(parent_id, ti.run_id)].append(ti)
+
+    for _, tis in tis_by_run_id.items():

Review Comment:
   Nit
   ```suggestion
       for tis in tis_by_run_id.values():
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py:
##########
@@ -237,3 +243,41 @@ def fill_task_instance_summaries(
                 note=ti_note,
             )
         )
+
+
+def get_structure_from_dag(dag: DAG) -> StructureDataResponse:
+    """If we do not have TIs, we just get the structure from the DAG."""
+    nodes = [task_group_to_dict(child) for child in 
dag.task_group.topological_sort()]
+    return StructureDataResponse(nodes=nodes, edges=[])
+
+
+def get_combined_structure(task_instances):
+    """Given task instances with varying DAG versions, get a combined 
structure."""
+    merged_nodes = []
+    # we dedup with serdag, as serdag.dag varies somehow?
+    serdags = {ti.dag_version.serialized_dag for ti in task_instances}
+    dags = [serdag.dag for serdag in serdags]
+    for dag in dags:
+        nodes = [task_group_to_dict(child) for child in 
dag.task_group.topological_sort()]
+        _merge_node_dicts(merged_nodes, nodes)
+
+    return StructureDataResponse(nodes=merged_nodes, edges=[])
+
+
+def _merge_node_dicts(current, new) -> None:
+    current_ids = {node["id"] for node in current}
+    for node in new:
+        if node["id"] in current_ids:
+            current_node = _get_node_by_id(current, node["id"])
+            # if we have children, merge those as well
+            if "children" in current_node:
+                _merge_node_dicts(current_node["children"], node["children"])

Review Comment:
   Nice, that's the part I was looking for



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to