This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d7c99b99ebb AIP-65: Add DagVersion to TaskInstanceHistory RESTAPI
Response (#46489)
d7c99b99ebb is described below
commit d7c99b99ebbaab779db6edaf321368d54c07ef25
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Wed Feb 5 22:46:11 2025 +0100
AIP-65: Add DagVersion to TaskInstanceHistory RESTAPI Response (#46489)
* AIP-65: Add DagVersion to TaskInstanceHistory RESTAPI Response
This PR adds dag_version to TaskInstanceHistory RESTAPI response.
Closes: #45993
* fixup! AIP-65: Add DagVersion to TaskInstanceHistory RESTAPI Response
---
.../core_api/datamodels/task_instances.py | 1 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 5 +
.../core_api/routes/public/task_instances.py | 15 +--
airflow/models/taskinstancehistory.py | 8 ++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 11 +++
airflow/ui/openapi-gen/requests/types.gen.ts | 1 +
.../core_api/routes/public/test_task_instances.py | 107 +++++++++++++++++++++
7 files changed, 142 insertions(+), 6 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/task_instances.py
b/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 505b9da207c..e30bf4928e0 100644
--- a/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -154,6 +154,7 @@ class TaskInstanceHistoryResponse(BaseModel):
pid: int | None
executor: str | None
executor_config: Annotated[str, BeforeValidator(str)]
+ dag_version: DagVersionResponse | None
class TaskInstanceHistoryCollectionResponse(BaseModel):
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index cab7daffbbe..4c3ebd13496 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -9794,6 +9794,10 @@ components:
executor_config:
type: string
title: Executor Config
+ dag_version:
+ anyOf:
+ - $ref: '#/components/schemas/DagVersionResponse'
+ - type: 'null'
type: object
required:
- task_id
@@ -9819,6 +9823,7 @@ components:
- pid
- executor
- executor_config
+ - dag_version
title: TaskInstanceHistoryResponse
description: TaskInstanceHistory serializer for responses.
TaskInstanceResponse:
diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow/api_fastapi/core_api/routes/public/task_instances.py
index d3a67b98049..51953de95bf 100644
--- a/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -272,11 +272,15 @@ def get_task_instance_tries(
"""Get list of task instances history."""
def _query(orm_object: Base) -> Select:
- query = select(orm_object).where(
- orm_object.dag_id == dag_id,
- orm_object.run_id == dag_run_id,
- orm_object.task_id == task_id,
- orm_object.map_index == map_index,
+ query = (
+ select(orm_object)
+ .where(
+ orm_object.dag_id == dag_id,
+ orm_object.run_id == dag_run_id,
+ orm_object.task_id == task_id,
+ orm_object.map_index == map_index,
+ )
+ .options(joinedload(orm_object.dag_version))
)
return query
@@ -291,7 +295,6 @@ def get_task_instance_tries(
status.HTTP_404_NOT_FOUND,
f"The Task Instance with dag_id: `{dag_id}`, run_id:
`{dag_run_id}`, task_id: `{task_id}` and map_index: `{map_index}` was not
found",
)
-
return TaskInstanceHistoryCollectionResponse(
task_instances=cast(list[TaskInstanceHistoryResponse], task_instances),
total_entries=len(task_instances),
diff --git a/airflow/models/taskinstancehistory.py
b/airflow/models/taskinstancehistory.py
index e97e6de22ec..d99cd34f3b8 100644
--- a/airflow/models/taskinstancehistory.py
+++ b/airflow/models/taskinstancehistory.py
@@ -33,6 +33,7 @@ from sqlalchemy import (
text,
)
from sqlalchemy.ext.mutable import MutableDict
+from sqlalchemy.orm import relationship
from sqlalchemy_utils import UUIDType
from airflow.models.base import Base, StringID
@@ -94,6 +95,13 @@ class TaskInstanceHistory(Base):
task_display_name = Column("task_display_name", String(2000),
nullable=True)
dag_version_id = Column(UUIDType(binary=False))
+ dag_version = relationship(
+ "DagVersion",
+ primaryjoin="TaskInstanceHistory.dag_version_id == DagVersion.id",
+ viewonly=True,
+ foreign_keys=[dag_version_id],
+ )
+
def __init__(
self,
ti: TaskInstance,
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index e5ae3ca3487..9ea975f97f4 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -4746,6 +4746,16 @@ export const $TaskInstanceHistoryResponse = {
type: "string",
title: "Executor Config",
},
+ dag_version: {
+ anyOf: [
+ {
+ $ref: "#/components/schemas/DagVersionResponse",
+ },
+ {
+ type: "null",
+ },
+ ],
+ },
},
type: "object",
required: [
@@ -4772,6 +4782,7 @@ export const $TaskInstanceHistoryResponse = {
"pid",
"executor",
"executor_config",
+ "dag_version",
],
title: "TaskInstanceHistoryResponse",
description: "TaskInstanceHistory serializer for responses.",
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 1283a77fe55..bfe93552316 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1239,6 +1239,7 @@ export type TaskInstanceHistoryResponse = {
pid: number | null;
executor: string | null;
executor_config: string;
+ dag_version: DagVersionResponse | null;
};
/**
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
index 7cf3f251a2d..00910237728 100644
--- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -1604,6 +1604,7 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
"try_number": 1,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
+ "dag_version": None,
}
@pytest.mark.parametrize("try_number", [1, 2])
@@ -1638,6 +1639,7 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
"try_number": try_number,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
+ "dag_version": None,
}
@pytest.mark.parametrize("try_number", [1, 2])
@@ -1701,6 +1703,7 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
"try_number": try_number,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
+ "dag_version": None,
}
def test_should_respond_200_with_task_state_in_deferred(self, test_client,
session):
@@ -1762,6 +1765,7 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
"try_number": 1,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
+ "dag_version": None,
}
def test_should_respond_200_with_task_state_in_removed(self, test_client,
session):
@@ -1797,6 +1801,7 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
"try_number": 1,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
+ "dag_version": None,
}
def test_raises_404_for_nonexistent_task_instance(self, test_client,
session):
@@ -1810,6 +1815,54 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
"detail": "The Task Instance with dag_id:
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id:
`nonexistent_task`, try_number: `0` and map_index: `-1` was not found"
}
+ @pytest.mark.parametrize(
+ "run_id, expected_version_number",
+ [
+ ("run1", 1),
+ ("run2", 2),
+ ("run3", 3),
+ ],
+ )
+ @pytest.mark.usefixtures("make_dag_with_multiple_versions")
+ def test_should_respond_200_with_versions(self, test_client, run_id,
expected_version_number):
+ response = test_client.get(
+
f"/public/dags/dag_with_multiple_versions/dagRuns/{run_id}/taskInstances/task1/tries/0"
+ )
+ assert response.status_code == 200
+ assert response.json() == {
+ "task_id": "task1",
+ "dag_id": "dag_with_multiple_versions",
+ "dag_run_id": run_id,
+ "map_index": -1,
+ "start_date": None,
+ "end_date": mock.ANY,
+ "duration": None,
+ "state": None,
+ "try_number": 0,
+ "max_tries": 0,
+ "task_display_name": "task1",
+ "hostname": "",
+ "unixname": getuser(),
+ "pool": "default_pool",
+ "pool_slots": 1,
+ "queue": "default",
+ "priority_weight": 1,
+ "operator": "EmptyOperator",
+ "queued_when": None,
+ "scheduled_when": None,
+ "pid": None,
+ "executor": None,
+ "executor_config": "{}",
+ "dag_version": {
+ "id": mock.ANY,
+ "version_number": expected_version_number,
+ "dag_id": "dag_with_multiple_versions",
+ "bundle_name": "dag_maker",
+ "bundle_version": None,
+ "created_at": mock.ANY,
+ },
+ }
+
class TestPostClearTaskInstances(TestTaskInstanceEndpoint):
@pytest.mark.parametrize(
@@ -2587,6 +2640,7 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
"try_number": 1,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
+ "dag_version": None,
},
{
"dag_id": "example_python_operator",
@@ -2612,6 +2666,7 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
"try_number": 2,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
+ "dag_version": None,
},
],
"total_entries": 2,
@@ -2658,6 +2713,7 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
"try_number": 1,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
+ "dag_version": None,
},
],
"total_entries": 1,
@@ -2725,6 +2781,7 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
"try_number": 1,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
+ "dag_version": None,
},
{
"dag_id": "example_python_operator",
@@ -2750,6 +2807,7 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
"try_number": 2,
"unixname": getuser(),
"dag_run_id": "TEST_DAG_RUN_ID",
+ "dag_version": None,
},
],
"total_entries": 2,
@@ -2766,6 +2824,55 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
"detail": "The Task Instance with dag_id:
`example_python_operator`, run_id: `TEST_DAG_RUN_ID`, task_id:
`non_existent_task` and map_index: `-1` was not found"
}
+ @pytest.mark.parametrize(
+ "run_id, expected_version_number",
+ [
+ ("run1", 1),
+ ("run2", 2),
+ ("run3", 3),
+ ],
+ )
+ @pytest.mark.usefixtures("make_dag_with_multiple_versions")
+ def test_should_respond_200_with_versions(self, test_client, run_id,
expected_version_number):
+ response = test_client.get(
+
f"/public/dags/dag_with_multiple_versions/dagRuns/{run_id}/taskInstances/task1/tries"
+ )
+ assert response.status_code == 200
+
+ assert response.json()["task_instances"][0] == {
+ "task_id": "task1",
+ "dag_id": "dag_with_multiple_versions",
+ "dag_run_id": run_id,
+ "map_index": -1,
+ "start_date": None,
+ "end_date": mock.ANY,
+ "duration": None,
+ "state": mock.ANY,
+ "try_number": 0,
+ "max_tries": 0,
+ "task_display_name": "task1",
+ "hostname": "",
+ "unixname": getuser(),
+ "pool": "default_pool",
+ "pool_slots": 1,
+ "queue": "default",
+ "priority_weight": 1,
+ "operator": "EmptyOperator",
+ "queued_when": None,
+ "scheduled_when": None,
+ "pid": None,
+ "executor": None,
+ "executor_config": "{}",
+ "dag_version": {
+ "id": mock.ANY,
+ "version_number": expected_version_number,
+ "dag_id": "dag_with_multiple_versions",
+ "bundle_name": "dag_maker",
+ "bundle_version": None,
+ "created_at": mock.ANY,
+ },
+ }
+
class TestPatchTaskInstance(TestTaskInstanceEndpoint):
ENDPOINT_URL = (