This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 7fd4133c717 [v3-1-test] Fix task group lookup using wrong DAG version 
for historical runs (#63360) (#63433)
7fd4133c717 is described below

commit 7fd4133c717ecec0f1af5d3caf427fcd996352f6
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 12 13:45:33 2026 +0100

    [v3-1-test] Fix task group lookup using wrong DAG version for historical 
runs (#63360) (#63433)
    
    * Fix task group lookup using wrong DAG version for historical runs
    
    When a task group is renamed between DAG versions, the API's
    get_task_instances endpoint was resolving task groups against the
    latest DAG version instead of the version the run was created with.
    This caused 404 errors when clicking on task groups in the grid view
    for historical runs.
    
    The fix changes get_dag_for_run_or_latest_version to prefer the
    run's created_dag_version_id, falling back to the existing behavior
    only when unavailable.
    
    * Move inline comment to docstring in get_dag_for_run_or_latest_version
    (cherry picked from commit 48cc031600372d1695efd3847ec56270e3c3e7cd)
    
    Co-authored-by: Pierre Jeambrun <[email protected]>
---
 .../src/airflow/api_fastapi/common/dagbag.py       | 16 +++++++-
 .../core_api/routes/public/test_task_instances.py  | 44 +++++++++++++++++++++-
 2 files changed, 58 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/dagbag.py 
b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
index 491a7131acc..d8cc98e7608 100644
--- a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
+++ b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
@@ -70,9 +70,23 @@ def get_dag_for_run(dag_bag: DBDagBag, dag_run: DagRun, 
session: Session) -> Ser
 def get_dag_for_run_or_latest_version(
     dag_bag: DBDagBag, dag_run: DagRun | None, dag_id: str | None, session: 
Session
 ) -> SerializedDAG:
+    """
+    Retrieve the serialized DAG for a specific run, or the latest version if 
no run is given.
+
+    When a dag_run is provided, we prefer the exact DAG version the run was 
created with
+    (``created_dag_version_id``) so that task group lookups, operator 
metadata, etc. match
+    the DAG structure at the time of the run.
+
+    This is necessary because ``get_dag_for_run`` delegates to 
``_version_from_dag_run``
+    which, for unversioned bundles (e.g. ``LocalDagBundle``), falls back to 
the *latest*
+    ``DagVersion``.
+    """
     dag: SerializedDAG | None = None
     if dag_run:
-        dag = dag_bag.get_dag_for_run(dag_run, session=session)
+        if dag_run.created_dag_version_id:
+            dag = dag_bag._get_dag(dag_run.created_dag_version_id, 
session=session)
+        if not dag:
+            dag = dag_bag.get_dag_for_run(dag_run, session=session)
     elif dag_id:
         dag = dag_bag.get_latest_version_of_dag(dag_id, session=session)
     if not dag:
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 8b3cb628da0..b0e9d0504ab 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -40,7 +40,8 @@ from airflow.models.renderedtifields import 
RenderedTaskInstanceFields as RTIF
 from airflow.models.taskinstancehistory import TaskInstanceHistory
 from airflow.models.taskmap import TaskMap
 from airflow.models.trigger import Trigger
-from airflow.sdk import BaseOperator
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import BaseOperator, TaskGroup
 from airflow.utils.platform import getuser
 from airflow.utils.state import DagRunState, State, TaskInstanceState
 from airflow.utils.types import DagRunType
@@ -1499,6 +1500,47 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
         assert (num_entries_batch1 + num_entries_batch2) == ti_count
         assert response_batch1 != response_batch2
 
+    def test_task_group_filter_uses_run_version_not_latest(self, test_client, 
dag_maker, session):
+        """
+        Task group lookup should use the DAG version from the run, not the 
latest version.
+
+        When a task group is renamed between versions, clicking on a 
historical run's
+        task group in the grid should still resolve correctly against the 
version
+        that run was created with — not the latest version where the group may 
have
+        a different name, i.e serialized_dag might not have that taskgroup 
anymore.
+        """
+        dag_id = "test_tg_version"
+
+        # Version 1: task group named "process_data"
+        with dag_maker(dag_id, session=session):
+            with TaskGroup(group_id="process_data"):
+                EmptyOperator(task_id="step_1")
+        dag_maker.create_dagrun(run_id="run_v1")
+        session.commit()
+
+        # Version 2: task group renamed to "process_data_v2"
+        with dag_maker(dag_id, session=session):
+            with TaskGroup(group_id="process_data_v2"):
+                EmptyOperator(task_id="step_1")
+        session.commit()
+
+        # The run was created with v1 which had "process_data".
+        # Querying with the old group name must succeed.
+        response = test_client.get(
+            f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
+            params={"task_group_id": "process_data"},
+        )
+        assert response.status_code == 200, response.json()
+        assert response.json()["total_entries"] == 1
+        assert response.json()["task_instances"][0]["task_id"] == 
"process_data.step_1"
+
+        # The new group name should NOT be found in the old run's version.
+        response = test_client.get(
+            f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
+            params={"task_group_id": "process_data_v2"},
+        )
+        assert response.status_code == 404
+
 
 class TestGetTaskDependencies(TestTaskInstanceEndpoint):
     def setup_method(self):

Reply via email to