This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 26e7b279870 [v3-1-test] UI: Fix Grid for cleared runs when tasks were 
removed (#56085) (#56297)
26e7b279870 is described below

commit 26e7b2798704986ac476ccc31853ebfd26b1e7f0
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Oct 13 15:50:41 2025 +0200

    [v3-1-test] UI: Fix Grid for cleared runs when tasks were removed (#56085) 
(#56297)
    
    Ensure removed/historical tasks from selected runs are visible in
    Grid even if they no longer exist in the current DAG version.
    
    We now:
    - Include synthetic leaf nodes for task_ids present in TIs but
    missing from the serialized DAG in both grid/structure and 
grid/ti_summaries.
    - Aggregate TI states for these synthetic nodes
    
    Add tests covering structure and TI summaries for removed tasks.
    (cherry picked from commit 77fae41380dc9f1a8fc8a449beea0e63056c291c)
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
---
 .../airflow/api_fastapi/core_api/routes/ui/grid.py | 77 +++++++++++++++++++---
 .../api_fastapi/core_api/services/ui/grid.py       |  7 +-
 .../api_fastapi/core_api/routes/ui/test_grid.py    | 46 +++++++++++++
 3 files changed, 119 insertions(+), 11 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index fea5abc52e0..6099fb55948 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -18,7 +18,7 @@
 from __future__ import annotations
 
 import collections
-from typing import TYPE_CHECKING, Annotated
+from typing import TYPE_CHECKING, Annotated, Any
 
 import structlog
 from fastapi import Depends, HTTPException, status
@@ -48,6 +48,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_
 from airflow.api_fastapi.core_api.security import requires_access_dag
 from airflow.api_fastapi.core_api.services.ui.grid import (
     _find_aggregates,
+    _get_aggs_for_node,
     _merge_node_dicts,
 )
 from airflow.api_fastapi.core_api.services.ui.task_group import (
@@ -156,7 +157,7 @@ def get_dag_structure(
     task_group_sort = get_task_group_children_getter()
     if not run_ids:
         nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(latest_dag.task_group)]
-        return nodes
+        return [GridNodeResponse(**n) for n in nodes]
 
     serdags = session.scalars(
         select(SerializedDagModel).where(
@@ -170,7 +171,7 @@ def get_dag_structure(
             )
         )
     )
-    merged_nodes: list[GridNodeResponse] = []
+    merged_nodes: list[dict[str, Any]] = []
     dags = [latest_dag]
     for serdag in serdags:
         if serdag:
@@ -179,7 +180,37 @@ def get_dag_structure(
         nodes = [task_group_to_dict_grid(x) for x in 
task_group_sort(dag.task_group)]
         _merge_node_dicts(merged_nodes, nodes)
 
-    return merged_nodes
+    # Ensure historical tasks (e.g. removed) that exist in TIs for the 
selected runs are represented
+    def _collect_ids(nodes: list[dict[str, Any]]) -> set[str]:
+        ids: set[str] = set()
+        for n in nodes:
+            nid = n.get("id")
+            if nid:
+                ids.add(nid)
+            children = n.get("children")
+            if children:
+                ids |= _collect_ids(children)  # recurse
+        return ids
+
+    existing_ids = _collect_ids(merged_nodes)
+    historical_task_ids = session.scalars(
+        select(TaskInstance.task_id)
+        .join(TaskInstance.dag_run)
+        .where(TaskInstance.dag_id == dag_id, DagRun.id.in_(run_ids))
+        .distinct()
+    )
+    for task_id in historical_task_ids:
+        if task_id not in existing_ids:
+            merged_nodes.append(
+                {
+                    "id": task_id,
+                    "label": task_id,
+                    "is_mapped": None,
+                    "children": None,
+                }
+            )
+
+    return [GridNodeResponse(**n) for n in merged_nodes]
 
 
 @grid_router.get(
@@ -345,19 +376,47 @@ def get_grid_ti_summaries(
         assert serdag
 
     def get_node_sumaries():
+        yielded_task_ids: set[str] = set()
+
+        # Yield all nodes discoverable from the serialized DAG structure
         for node in _find_aggregates(
             node=serdag.dag.task_group,
             parent_node=None,
             ti_details=ti_details,
         ):
-            if node["type"] == "task":
-                node["child_states"] = None
-                node["min_start_date"] = None
-                node["max_end_date"] = None
+            if node["type"] in {"task", "mapped_task"}:
+                yielded_task_ids.add(node["task_id"])
+                if node["type"] == "task":
+                    node["child_states"] = None
+                    node["min_start_date"] = None
+                    node["max_end_date"] = None
             yield node
 
+        # For good history: add synthetic leaf nodes for task_ids that have 
TIs in this run
+        # but are not present in the current DAG structure (e.g. removed tasks)
+        missing_task_ids = set(ti_details.keys()) - yielded_task_ids
+        for task_id in sorted(missing_task_ids):
+            detail = ti_details[task_id]
+            # Create a leaf task node with aggregated state from its TIs
+            agg = _get_aggs_for_node(detail)
+            yield {
+                "task_id": task_id,
+                "type": "task",
+                "parent_id": None,
+                **agg,
+                # Align with leaf behavior
+                "child_states": None,
+                "min_start_date": None,
+                "max_end_date": None,
+            }
+
+    task_instances = list(get_node_sumaries())
+    # If a group id and a task id collide, prefer the group record
+    group_ids = {n.get("task_id") for n in task_instances if n.get("type") == 
"group"}
+    filtered = [n for n in task_instances if not (n.get("type") == "task" and 
n.get("task_id") in group_ids)]
+
     return {  # type: ignore[return-value]
         "run_id": run_id,
         "dag_id": dag_id,
-        "task_instances": list(get_node_sumaries()),
+        "task_instances": filtered,
     }
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
index 1f64ffcefa8..acf333c64a8 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py
@@ -85,16 +85,19 @@ def _find_aggregates(
     """Recursively fill the Task Group Map."""
     node_id = node.node_id
     parent_id = parent_node.node_id if parent_node else None
-    details = ti_details[node_id]
+    # Do not mutate ti_details by accidental key creation
+    details = ti_details.get(node_id, [])
 
     if node is None:
         return
     if isinstance(node, MappedOperator):
+        # For unmapped tasks, reflect a single None state so UI shows one 
square
+        mapped_details = details or [{"state": None, "start_date": None, 
"end_date": None}]
         yield {
             "task_id": node_id,
             "type": "mapped_task",
             "parent_id": parent_id,
-            **_get_aggs_for_node(details),
+            **_get_aggs_for_node(mapped_details),
         }
 
         return
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
index 65173dabda8..93b5ecfa487 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_grid.py
@@ -438,6 +438,39 @@ class TestGetGridDataEndpoint:
             },
         ]
 
+        # Also verify that TI summaries include a leaf entry for the removed 
task
+        ti_resp = test_client.get(f"/grid/ti_summaries/{DAG_ID_3}/run_3")
+        assert ti_resp.status_code == 200
+        ti_payload = ti_resp.json()
+        assert ti_payload["dag_id"] == DAG_ID_3
+        assert ti_payload["run_id"] == "run_3"
+        # Find the removed task summary; it should exist even if not in 
current serialized DAG structure
+        removed_ti = next(
+            (
+                n
+                for n in ti_payload["task_instances"]
+                if n["task_id"] == TASK_ID_4 and n["child_states"] is None
+            ),
+            None,
+        )
+        assert removed_ti is not None
+        # Its state should be the aggregated state of its TIs, which includes 
'removed'
+        assert removed_ti["state"] in (
+            "removed",
+            None,
+            "skipped",
+            "success",
+            "failed",
+            "running",
+            "queued",
+            "scheduled",
+            "deferred",
+            "restarting",
+            "up_for_retry",
+            "up_for_reschedule",
+            "upstream_failed",
+        )
+
     def test_get_dag_structure(self, session, test_client):
         session.commit()
         response = test_client.get(f"/grid/structure/{DAG_ID}?limit=5")
@@ -690,3 +723,16 @@ class TestGetGridDataEndpoint:
         expected = sort_dict(expected)
         actual = sort_dict(actual)
         assert actual == expected
+
+    def 
test_structure_includes_historical_removed_task_with_proper_shape(self, 
session, test_client):
+        # Ensure the structure endpoint returns synthetic node for 
historical/removed task
+        response = test_client.get(f"/grid/structure/{DAG_ID_3}")
+        assert response.status_code == 200
+        nodes = response.json()
+        # Find the historical removed task id
+        t4 = next((n for n in nodes if n["id"] == TASK_ID_4), None)
+        assert t4 is not None
+        assert t4["label"] == TASK_ID_4
+        # Optional None fields are excluded from response due to 
response_model_exclude_none=True
+        assert "is_mapped" not in t4
+        assert "children" not in t4

Reply via email to