This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d7c99b99ebb AIP-65: Add DagVersion to TaskInstanceHistory RESTAPI 
Response (#46489)
d7c99b99ebb is described below

commit d7c99b99ebbaab779db6edaf321368d54c07ef25
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Feb 5 22:46:11 2025 +0100

    AIP-65: Add DagVersion to TaskInstanceHistory RESTAPI Response (#46489)
    
    * AIP-65: Add DagVersion to TaskInstanceHistory RESTAPI Response
    
    This PR adds dag_version to TaskInstanceHistory RESTAPI response.
    Closes: #45993
    
    * fixup! AIP-65: Add DagVersion to TaskInstanceHistory RESTAPI Response
---
 .../core_api/datamodels/task_instances.py          |   1 +
 .../api_fastapi/core_api/openapi/v1-generated.yaml |   5 +
 .../core_api/routes/public/task_instances.py       |  15 +--
 airflow/models/taskinstancehistory.py              |   8 ++
 airflow/ui/openapi-gen/requests/schemas.gen.ts     |  11 +++
 airflow/ui/openapi-gen/requests/types.gen.ts       |   1 +
 .../core_api/routes/public/test_task_instances.py  | 107 +++++++++++++++++++++
 7 files changed, 142 insertions(+), 6 deletions(-)

diff --git a/airflow/api_fastapi/core_api/datamodels/task_instances.py 
b/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 505b9da207c..e30bf4928e0 100644
--- a/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -154,6 +154,7 @@ class TaskInstanceHistoryResponse(BaseModel):
     pid: int | None
     executor: str | None
     executor_config: Annotated[str, BeforeValidator(str)]
+    dag_version: DagVersionResponse | None
 
 
 class TaskInstanceHistoryCollectionResponse(BaseModel):
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index cab7daffbbe..4c3ebd13496 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -9794,6 +9794,10 @@ components:
         executor_config:
           type: string
           title: Executor Config
+        dag_version:
+          anyOf:
+          - $ref: '#/components/schemas/DagVersionResponse'
+          - type: 'null'
       type: object
       required:
       - task_id
@@ -9819,6 +9823,7 @@ components:
       - pid
       - executor
       - executor_config
+      - dag_version
       title: TaskInstanceHistoryResponse
       description: TaskInstanceHistory serializer for responses.
     TaskInstanceResponse:
diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py 
b/airflow/api_fastapi/core_api/routes/public/task_instances.py
index d3a67b98049..51953de95bf 100644
--- a/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -272,11 +272,15 @@ def get_task_instance_tries(
     """Get list of task instances history."""
 
     def _query(orm_object: Base) -> Select:
-        query = select(orm_object).where(
-            orm_object.dag_id == dag_id,
-            orm_object.run_id == dag_run_id,
-            orm_object.task_id == task_id,
-            orm_object.map_index == map_index,
+        query = (
+            select(orm_object)
+            .where(
+                orm_object.dag_id == dag_id,
+                orm_object.run_id == dag_run_id,
+                orm_object.task_id == task_id,
+                orm_object.map_index == map_index,
+            )
+            .options(joinedload(orm_object.dag_version))
         )
         return query
 
@@ -291,7 +295,6 @@ def get_task_instance_tries(
             status.HTTP_404_NOT_FOUND,
             f"The Task Instance with dag_id: `{dag_id}`, run_id: 
`{dag_run_id}`, task_id: `{task_id}` and map_index: `{map_index}` was not 
found",
         )
-
     return TaskInstanceHistoryCollectionResponse(
         task_instances=cast(list[TaskInstanceHistoryResponse], task_instances),
         total_entries=len(task_instances),
diff --git a/airflow/models/taskinstancehistory.py 
b/airflow/models/taskinstancehistory.py
index e97e6de22ec..d99cd34f3b8 100644
--- a/airflow/models/taskinstancehistory.py
+++ b/airflow/models/taskinstancehistory.py
@@ -33,6 +33,7 @@ from sqlalchemy import (
     text,
 )
 from sqlalchemy.ext.mutable import MutableDict
+from sqlalchemy.orm import relationship
 from sqlalchemy_utils import UUIDType
 
 from airflow.models.base import Base, StringID
@@ -94,6 +95,13 @@ class TaskInstanceHistory(Base):
     task_display_name = Column("task_display_name", String(2000), 
nullable=True)
     dag_version_id = Column(UUIDType(binary=False))
 
+    dag_version = relationship(
+        "DagVersion",
+        primaryjoin="TaskInstanceHistory.dag_version_id == DagVersion.id",
+        viewonly=True,
+        foreign_keys=[dag_version_id],
+    )
+
     def __init__(
         self,
         ti: TaskInstance,
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e5ae3ca3487..9ea975f97f4 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -4746,6 +4746,16 @@ export const $TaskInstanceHistoryResponse = {
       type: "string",
       title: "Executor Config",
     },
+    dag_version: {
+      anyOf: [
+        {
+          $ref: "#/components/schemas/DagVersionResponse",
+        },
+        {
+          type: "null",
+        },
+      ],
+    },
   },
   type: "object",
   required: [
@@ -4772,6 +4782,7 @@ export const $TaskInstanceHistoryResponse = {
     "pid",
     "executor",
     "executor_config",
+    "dag_version",
   ],
   title: "TaskInstanceHistoryResponse",
   description: "TaskInstanceHistory serializer for responses.",
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 1283a77fe55..bfe93552316 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1239,6 +1239,7 @@ export type TaskInstanceHistoryResponse = {
   pid: number | null;
   executor: string | null;
   executor_config: string;
+  dag_version: DagVersionResponse | null;
 };
 
 /**
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py 
b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
index 7cf3f251a2d..00910237728 100644
--- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -1604,6 +1604,7 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
             "try_number": 1,
             "unixname": getuser(),
             "dag_run_id": "TEST_DAG_RUN_ID",
+            "dag_version": None,
         }
 
     @pytest.mark.parametrize("try_number", [1, 2])
@@ -1638,6 +1639,7 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
             "try_number": try_number,
             "unixname": getuser(),
             "dag_run_id": "TEST_DAG_RUN_ID",
+            "dag_version": None,
         }
 
     @pytest.mark.parametrize("try_number", [1, 2])
@@ -1701,6 +1703,7 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
                 "try_number": try_number,
                 "unixname": getuser(),
                 "dag_run_id": "TEST_DAG_RUN_ID",
+                "dag_version": None,
             }
 
     def test_should_respond_200_with_task_state_in_deferred(self, test_client, 
session):
@@ -1762,6 +1765,7 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
             "try_number": 1,
             "unixname": getuser(),
             "dag_run_id": "TEST_DAG_RUN_ID",
+            "dag_version": None,
         }
 
     def test_should_respond_200_with_task_state_in_removed(self, test_client, 
session):
@@ -1797,6 +1801,7 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
             "try_number": 1,
             "unixname": getuser(),
             "dag_run_id": "TEST_DAG_RUN_ID",
+            "dag_version": None,
         }
 
     def test_raises_404_for_nonexistent_task_instance(self, test_client, 
session):
@@ -1810,6 +1815,54 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
             "detail": "The Task Instance with dag_id: 
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: 
`nonexistent_task`, try_number: `0` and map_index: `-1` was not found"
         }
 
+    @pytest.mark.parametrize(
+        "run_id, expected_version_number",
+        [
+            ("run1", 1),
+            ("run2", 2),
+            ("run3", 3),
+        ],
+    )
+    @pytest.mark.usefixtures("make_dag_with_multiple_versions")
+    def test_should_respond_200_with_versions(self, test_client, run_id, 
expected_version_number):
+        response = test_client.get(
+            
f"/public/dags/dag_with_multiple_versions/dagRuns/{run_id}/taskInstances/task1/tries/0"
+        )
+        assert response.status_code == 200
+        assert response.json() == {
+            "task_id": "task1",
+            "dag_id": "dag_with_multiple_versions",
+            "dag_run_id": run_id,
+            "map_index": -1,
+            "start_date": None,
+            "end_date": mock.ANY,
+            "duration": None,
+            "state": None,
+            "try_number": 0,
+            "max_tries": 0,
+            "task_display_name": "task1",
+            "hostname": "",
+            "unixname": getuser(),
+            "pool": "default_pool",
+            "pool_slots": 1,
+            "queue": "default",
+            "priority_weight": 1,
+            "operator": "EmptyOperator",
+            "queued_when": None,
+            "scheduled_when": None,
+            "pid": None,
+            "executor": None,
+            "executor_config": "{}",
+            "dag_version": {
+                "id": mock.ANY,
+                "version_number": expected_version_number,
+                "dag_id": "dag_with_multiple_versions",
+                "bundle_name": "dag_maker",
+                "bundle_version": None,
+                "created_at": mock.ANY,
+            },
+        }
+
 
 class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
     @pytest.mark.parametrize(
@@ -2587,6 +2640,7 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
                     "try_number": 1,
                     "unixname": getuser(),
                     "dag_run_id": "TEST_DAG_RUN_ID",
+                    "dag_version": None,
                 },
                 {
                     "dag_id": "example_python_operator",
@@ -2612,6 +2666,7 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
                     "try_number": 2,
                     "unixname": getuser(),
                     "dag_run_id": "TEST_DAG_RUN_ID",
+                    "dag_version": None,
                 },
             ],
             "total_entries": 2,
@@ -2658,6 +2713,7 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
                     "try_number": 1,
                     "unixname": getuser(),
                     "dag_run_id": "TEST_DAG_RUN_ID",
+                    "dag_version": None,
                 },
             ],
             "total_entries": 1,
@@ -2725,6 +2781,7 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
                         "try_number": 1,
                         "unixname": getuser(),
                         "dag_run_id": "TEST_DAG_RUN_ID",
+                        "dag_version": None,
                     },
                     {
                         "dag_id": "example_python_operator",
@@ -2750,6 +2807,7 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
                         "try_number": 2,
                         "unixname": getuser(),
                         "dag_run_id": "TEST_DAG_RUN_ID",
+                        "dag_version": None,
                     },
                 ],
                 "total_entries": 2,
@@ -2766,6 +2824,55 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
             "detail": "The Task Instance with dag_id: 
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id: 
`non_existent_task` and map_index: `-1` was not found"
         }
 
+    @pytest.mark.parametrize(
+        "run_id, expected_version_number",
+        [
+            ("run1", 1),
+            ("run2", 2),
+            ("run3", 3),
+        ],
+    )
+    @pytest.mark.usefixtures("make_dag_with_multiple_versions")
+    def test_should_respond_200_with_versions(self, test_client, run_id, 
expected_version_number):
+        response = test_client.get(
+            
f"/public/dags/dag_with_multiple_versions/dagRuns/{run_id}/taskInstances/task1/tries"
+        )
+        assert response.status_code == 200
+
+        assert response.json()["task_instances"][0] == {
+            "task_id": "task1",
+            "dag_id": "dag_with_multiple_versions",
+            "dag_run_id": run_id,
+            "map_index": -1,
+            "start_date": None,
+            "end_date": mock.ANY,
+            "duration": None,
+            "state": mock.ANY,
+            "try_number": 0,
+            "max_tries": 0,
+            "task_display_name": "task1",
+            "hostname": "",
+            "unixname": getuser(),
+            "pool": "default_pool",
+            "pool_slots": 1,
+            "queue": "default",
+            "priority_weight": 1,
+            "operator": "EmptyOperator",
+            "queued_when": None,
+            "scheduled_when": None,
+            "pid": None,
+            "executor": None,
+            "executor_config": "{}",
+            "dag_version": {
+                "id": mock.ANY,
+                "version_number": expected_version_number,
+                "dag_id": "dag_with_multiple_versions",
+                "bundle_name": "dag_maker",
+                "bundle_version": None,
+                "created_at": mock.ANY,
+            },
+        }
+
 
 class TestPatchTaskInstance(TestTaskInstanceEndpoint):
     ENDPOINT_URL = (

Reply via email to