This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 52f89dd5bc4 AIP-84 Clear Task Instance improve response (#45514)
52f89dd5bc4 is described below
commit 52f89dd5bc471b1702e7e5e67b90420e27d51d20
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Fri Jan 10 18:48:22 2025 +0800
AIP-84 Clear Task Instance improve response (#45514)
---
.../core_api/datamodels/task_instances.py | 15 --------
.../api_fastapi/core_api/openapi/v1-generated.yaml | 36 +------------------
.../core_api/routes/public/task_instances.py | 9 +++--
airflow/ui/openapi-gen/queries/queries.ts | 2 +-
airflow/ui/openapi-gen/requests/schemas.gen.ts | 41 ----------------------
airflow/ui/openapi-gen/requests/services.gen.ts | 2 +-
airflow/ui/openapi-gen/requests/types.gen.ts | 21 ++---------
.../core_api/routes/public/test_task_instances.py | 41 +++++++++++++++++++---
8 files changed, 45 insertions(+), 122 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/task_instances.py
b/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 07a1f77ad54..3d191b96828 100644
--- a/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -220,18 +220,3 @@ class PatchTaskInstanceBody(BaseModel):
if ns not in valid_states:
raise ValueError(f"'{ns}' is not one of {valid_states}")
return ns
-
-
-class TaskInstanceReferenceResponse(BaseModel):
- """Task Instance Reference serializer for responses."""
-
- task_id: str
- dag_run_id: str = Field(validation_alias="run_id")
- dag_id: str
-
-
-class TaskInstanceReferenceCollectionResponse(BaseModel):
- """Task Instance Reference collection serializer for responses."""
-
- task_instances: list[TaskInstanceReferenceResponse]
- total_entries: int
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index bb4861a5fc3..5d06e7ab28e 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -5506,7 +5506,7 @@ paths:
content:
application/json:
schema:
- $ref:
'#/components/schemas/TaskInstanceReferenceCollectionResponse'
+ $ref: '#/components/schemas/TaskInstanceCollectionResponse'
'401':
content:
application/json:
@@ -9063,40 +9063,6 @@ components:
- executor_config
title: TaskInstanceHistoryResponse
description: TaskInstanceHistory serializer for responses.
- TaskInstanceReferenceCollectionResponse:
- properties:
- task_instances:
- items:
- $ref: '#/components/schemas/TaskInstanceReferenceResponse'
- type: array
- title: Task Instances
- total_entries:
- type: integer
- title: Total Entries
- type: object
- required:
- - task_instances
- - total_entries
- title: TaskInstanceReferenceCollectionResponse
- description: Task Instance Reference collection serializer for responses.
- TaskInstanceReferenceResponse:
- properties:
- task_id:
- type: string
- title: Task Id
- dag_run_id:
- type: string
- title: Dag Run Id
- dag_id:
- type: string
- title: Dag Id
- type: object
- required:
- - task_id
- - dag_run_id
- - dag_id
- title: TaskInstanceReferenceResponse
- description: Task Instance Reference serializer for responses.
TaskInstanceResponse:
properties:
id:
diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 9eaf1913747..91e4cb0ddcc 100644
--- a/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -55,8 +55,6 @@ from airflow.api_fastapi.core_api.datamodels.task_instances
import (
TaskInstanceCollectionResponse,
TaskInstanceHistoryCollectionResponse,
TaskInstanceHistoryResponse,
- TaskInstanceReferenceCollectionResponse,
- TaskInstanceReferenceResponse,
TaskInstanceResponse,
TaskInstancesBatchBody,
)
@@ -550,7 +548,7 @@ def post_clear_task_instances(
request: Request,
body: ClearTaskInstancesBody,
session: SessionDep,
-) -> TaskInstanceReferenceCollectionResponse:
+) -> TaskInstanceCollectionResponse:
"""Clear task instances."""
dag = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
@@ -597,6 +595,7 @@ def post_clear_task_instances(
dry_run=True,
task_ids=task_ids,
dag_bag=request.app.state.dag_bag,
+ session=session,
**body.model_dump(
include={
"start_date",
@@ -615,9 +614,9 @@ def post_clear_task_instances(
DagRunState.QUEUED if reset_dag_runs else False,
)
- return TaskInstanceReferenceCollectionResponse(
+ return TaskInstanceCollectionResponse(
task_instances=[
- TaskInstanceReferenceResponse.model_validate(
+ TaskInstanceResponse.model_validate(
ti,
from_attributes=True,
)
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index e5d6d08240f..9a816460704 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -2980,7 +2980,7 @@ export const useTaskInstanceServiceGetTaskInstancesBatch
= <
* @param data The data for the request.
* @param data.dagId
* @param data.requestBody
- * @returns TaskInstanceReferenceCollectionResponse Successful Response
+ * @returns TaskInstanceCollectionResponse Successful Response
* @throws ApiError
*/
export const useTaskInstanceServicePostClearTaskInstances = <
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 9d59d4fd59a..64a8fcc9e81 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -4303,47 +4303,6 @@ export const $TaskInstanceHistoryResponse = {
description: "TaskInstanceHistory serializer for responses.",
} as const;
-export const $TaskInstanceReferenceCollectionResponse = {
- properties: {
- task_instances: {
- items: {
- $ref: "#/components/schemas/TaskInstanceReferenceResponse",
- },
- type: "array",
- title: "Task Instances",
- },
- total_entries: {
- type: "integer",
- title: "Total Entries",
- },
- },
- type: "object",
- required: ["task_instances", "total_entries"],
- title: "TaskInstanceReferenceCollectionResponse",
- description: "Task Instance Reference collection serializer for responses.",
-} as const;
-
-export const $TaskInstanceReferenceResponse = {
- properties: {
- task_id: {
- type: "string",
- title: "Task Id",
- },
- dag_run_id: {
- type: "string",
- title: "Dag Run Id",
- },
- dag_id: {
- type: "string",
- title: "Dag Id",
- },
- },
- type: "object",
- required: ["task_id", "dag_run_id", "dag_id"],
- title: "TaskInstanceReferenceResponse",
- description: "Task Instance Reference serializer for responses.",
-} as const;
-
export const $TaskInstanceResponse = {
properties: {
id: {
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index ee8b5ab70c2..01666ae0904 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -2394,7 +2394,7 @@ export class TaskInstanceService {
* @param data The data for the request.
* @param data.dagId
* @param data.requestBody
- * @returns TaskInstanceReferenceCollectionResponse Successful Response
+ * @returns TaskInstanceCollectionResponse Successful Response
* @throws ApiError
*/
public static postClearTaskInstances(
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index f9dad9f4eda..232e66d2464 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1050,23 +1050,6 @@ export type TaskInstanceHistoryResponse = {
executor_config: string;
};
-/**
- * Task Instance Reference collection serializer for responses.
- */
-export type TaskInstanceReferenceCollectionResponse = {
- task_instances: Array<TaskInstanceReferenceResponse>;
- total_entries: number;
-};
-
-/**
- * Task Instance Reference serializer for responses.
- */
-export type TaskInstanceReferenceResponse = {
- task_id: string;
- dag_run_id: string;
- dag_id: string;
-};
-
/**
* TaskInstance serializer for responses.
*/
@@ -1984,7 +1967,7 @@ export type PostClearTaskInstancesData = {
requestBody: ClearTaskInstancesBody;
};
-export type PostClearTaskInstancesResponse =
TaskInstanceReferenceCollectionResponse;
+export type PostClearTaskInstancesResponse = TaskInstanceCollectionResponse;
export type GetLogData = {
accept?: "application/json" | "text/plain" | "*/*";
@@ -4021,7 +4004,7 @@ export type $OpenApiTs = {
/**
* Successful Response
*/
- 200: TaskInstanceReferenceCollectionResponse;
+ 200: TaskInstanceCollectionResponse;
/**
* Unauthorized
*/
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
index d62d3794434..eb07ae8bab6 100644
--- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -1878,7 +1878,7 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
)
assert response.status_code == 200
- # dag_id (3rd argument) is a different session object. Manually
asserting that the dag_id
+ # dag (3rd argument) is a different session object. Manually asserting
that the dag_id
# is the same.
mock_clearti.assert_called_once_with([], mock.ANY, mock.ANY,
DagRunState.QUEUED)
assert mock_clearti.call_args[0][2].dag_id == dag_id
@@ -1982,7 +1982,9 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
},
]
for task_instance in expected_response:
- assert task_instance in response.json()["task_instances"]
+ assert task_instance in [
+ {key: ti[key] for key in task_instance.keys()} for ti in
response.json()["task_instances"]
+ ]
assert response.json()["total_entries"] == 6
assert failed_dag_runs == 0
@@ -2037,6 +2039,32 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
"dag_id": "example_python_operator",
"dag_run_id": "TEST_DAG_RUN_ID_0",
"task_id": "print_the_context",
+ "duration": mock.ANY,
+ "end_date": mock.ANY,
+ "executor": None,
+ "executor_config": "{}",
+ "hostname": "",
+ "id": mock.ANY,
+ "logical_date": "2020-01-01T00:00:00Z",
+ "map_index": -1,
+ "max_tries": 0,
+ "note": "placeholder-note",
+ "operator": "PythonOperator",
+ "pid": 100,
+ "pool": "default_pool",
+ "pool_slots": 1,
+ "priority_weight": 9,
+ "queue": "default_queue",
+ "queued_when": None,
+ "rendered_fields": {},
+ "rendered_map_index": None,
+ "start_date": "2020-01-02T00:00:00Z",
+ "state": "restarting",
+ "task_display_name": "print_the_context",
+ "trigger": None,
+ "triggerer_job": None,
+ "try_number": 0,
+ "unixname": mock.ANY,
},
]
assert response.json()["task_instances"] == expected_response
@@ -2121,7 +2149,9 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
},
]
for task_instance in expected_response:
- assert task_instance in response.json()["task_instances"]
+ assert task_instance in [
+ {key: ti[key] for key in task_instance.keys()} for ti in
response.json()["task_instances"]
+ ]
assert response.json()["total_entries"] == 6
def test_should_respond_200_with_include_future(self, test_client,
session):
@@ -2204,7 +2234,9 @@ class
TestPostClearTaskInstances(TestTaskInstanceEndpoint):
},
]
for task_instance in expected_response:
- assert task_instance in response.json()["task_instances"]
+ assert task_instance in [
+ {key: ti[key] for key in task_instance.keys()} for ti in
response.json()["task_instances"]
+ ]
assert response.json()["total_entries"] == 6
def test_should_respond_404_for_nonexistent_dagrun_id(self, test_client,
session):
@@ -2339,7 +2371,6 @@ class TestGetTaskInstanceTries(TestTaskInstanceEndpoint):
self.create_task_instances(
session=session, task_instances=[{"state": State.SUCCESS}],
with_ti_history=True
)
- print("here")
response = test_client.get(
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries"
)