This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 7fd4133c717 [v3-1-test] Fix task group lookup using wrong DAG version
for historical runs (#63360) (#63433)
7fd4133c717 is described below
commit 7fd4133c717ecec0f1af5d3caf427fcd996352f6
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 12 13:45:33 2026 +0100
[v3-1-test] Fix task group lookup using wrong DAG version for historical
runs (#63360) (#63433)
* Fix task group lookup using wrong DAG version for historical runs
When a task group is renamed between DAG versions, the API's
get_task_instances endpoint was resolving task groups against the
latest DAG version instead of the version the run was created with.
This caused 404 errors when clicking on task groups in the grid view
for historical runs.
The fix changes get_dag_for_run_or_latest_version to prefer the
run's created_dag_version_id, falling back to the existing behavior
only when unavailable.
* Move inline comment to docstring in get_dag_for_run_or_latest_version
(cherry picked from commit 48cc031600372d1695efd3847ec56270e3c3e7cd)
Co-authored-by: Pierre Jeambrun <[email protected]>
---
.../src/airflow/api_fastapi/common/dagbag.py | 16 +++++++-
.../core_api/routes/public/test_task_instances.py | 44 +++++++++++++++++++++-
2 files changed, 58 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
index 491a7131acc..d8cc98e7608 100644
--- a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
+++ b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
@@ -70,9 +70,23 @@ def get_dag_for_run(dag_bag: DBDagBag, dag_run: DagRun,
session: Session) -> Ser
def get_dag_for_run_or_latest_version(
dag_bag: DBDagBag, dag_run: DagRun | None, dag_id: str | None, session:
Session
) -> SerializedDAG:
+ """
+ Retrieve the serialized DAG for a specific run, or the latest version if
no run is given.
+
+ When a dag_run is provided, we prefer the exact DAG version the run was
created with
+ (``created_dag_version_id``) so that task group lookups, operator
metadata, etc. match
+ the DAG structure at the time of the run.
+
+ This is necessary because ``get_dag_for_run`` delegates to
``_version_from_dag_run``
+ which, for unversioned bundles (e.g. ``LocalDagBundle``), falls back to
the *latest*
+ ``DagVersion``.
+ """
dag: SerializedDAG | None = None
if dag_run:
- dag = dag_bag.get_dag_for_run(dag_run, session=session)
+ if dag_run.created_dag_version_id:
+ dag = dag_bag._get_dag(dag_run.created_dag_version_id,
session=session)
+ if not dag:
+ dag = dag_bag.get_dag_for_run(dag_run, session=session)
elif dag_id:
dag = dag_bag.get_latest_version_of_dag(dag_id, session=session)
if not dag:
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 8b3cb628da0..b0e9d0504ab 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -40,7 +40,8 @@ from airflow.models.renderedtifields import
RenderedTaskInstanceFields as RTIF
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.models.taskmap import TaskMap
from airflow.models.trigger import Trigger
-from airflow.sdk import BaseOperator
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import BaseOperator, TaskGroup
from airflow.utils.platform import getuser
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import DagRunType
@@ -1499,6 +1500,47 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
assert (num_entries_batch1 + num_entries_batch2) == ti_count
assert response_batch1 != response_batch2
+ def test_task_group_filter_uses_run_version_not_latest(self, test_client,
dag_maker, session):
+ """
+ Task group lookup should use the DAG version from the run, not the
latest version.
+
+ When a task group is renamed between versions, clicking on a
historical run's
+ task group in the grid should still resolve correctly against the
version
+ that run was created with — not the latest version where the group may
have
+ a different name, i.e serialized_dag might not have that taskgroup
anymore.
+ """
+ dag_id = "test_tg_version"
+
+ # Version 1: task group named "process_data"
+ with dag_maker(dag_id, session=session):
+ with TaskGroup(group_id="process_data"):
+ EmptyOperator(task_id="step_1")
+ dag_maker.create_dagrun(run_id="run_v1")
+ session.commit()
+
+ # Version 2: task group renamed to "process_data_v2"
+ with dag_maker(dag_id, session=session):
+ with TaskGroup(group_id="process_data_v2"):
+ EmptyOperator(task_id="step_1")
+ session.commit()
+
+ # The run was created with v1 which had "process_data".
+ # Querying with the old group name must succeed.
+ response = test_client.get(
+ f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
+ params={"task_group_id": "process_data"},
+ )
+ assert response.status_code == 200, response.json()
+ assert response.json()["total_entries"] == 1
+ assert response.json()["task_instances"][0]["task_id"] ==
"process_data.step_1"
+
+ # The new group name should NOT be found in the old run's version.
+ response = test_client.get(
+ f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
+ params={"task_group_id": "process_data_v2"},
+ )
+ assert response.status_code == 404
+
class TestGetTaskDependencies(TestTaskInstanceEndpoint):
def setup_method(self):