This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d11a504c8ec AIP-84 Get Mapped Task Instance Try Details (#44206)
d11a504c8ec is described below
commit d11a504c8ecd4d3024ae60cad9e8efc2fb642ba4
Author: kandharvishnu <[email protected]>
AuthorDate: Wed Nov 20 22:22:07 2024 +0530
AIP-84 Get Mapped Task Instance Try Details (#44206)
* adding get_mapped_task_instance_try_details
* dummy change
* revert dummy change
* removing print statements
---------
Co-authored-by: kandharvishnuu
<[email protected]>
---
.../endpoints/task_instance_endpoint.py | 1 +
.../api_fastapi/core_api/openapi/v1-generated.yaml | 68 ++++++++++++++++++++++
.../core_api/routes/public/task_instances.py | 22 +++++++
airflow/ui/openapi-gen/queries/common.ts | 29 +++++++++
airflow/ui/openapi-gen/queries/prefetch.ts | 40 +++++++++++++
airflow/ui/openapi-gen/queries/queries.ts | 47 +++++++++++++++
airflow/ui/openapi-gen/queries/suspense.ts | 47 +++++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 35 +++++++++++
airflow/ui/openapi-gen/requests/types.gen.ts | 38 ++++++++++++
.../core_api/routes/public/test_task_instances.py | 62 ++++++++++++++++++++
10 files changed, 389 insertions(+)
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index 00eb51bae10..f4f7ac23cb8 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -756,6 +756,7 @@ def get_task_instance_try_details(
return task_instance_history_schema.dump(result[0])
+@mark_fastapi_migration_done
@provide_session
def get_mapped_task_instance_try_details(
*,
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index c1aadeb6b6d..c58bf1c6be4 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -3961,6 +3961,74 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}:
+ get:
+ tags:
+ - Task Instance
+ summary: Get Mapped Task Instance Try Details
+ operationId: get_mapped_task_instance_try_details
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ - name: task_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Task Id
+ - name: task_try_number
+ in: path
+ required: true
+ schema:
+ type: integer
+ title: Task Try Number
+ - name: map_index
+ in: path
+ required: true
+ schema:
+ type: integer
+ title: Map Index
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/TaskInstanceHistoryResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/tasks/:
get:
tags:
diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 9e4ea49e088..f4769a981b8 100644
--- a/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -460,3 +460,25 @@ def get_task_instance_try_details(
f"The Task Instance with dag_id: `{dag_id}`, run_id:
`{dag_run_id}`, task_id: `{task_id}`, try_number: `{task_try_number}` and
map_index: `{map_index}` was not found",
)
return TaskInstanceHistoryResponse.model_validate(result,
from_attributes=True)
+
+
+@task_instances_router.get(
+ "/{task_id}/{map_index}/tries/{task_try_number}",
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+)
+def get_mapped_task_instance_try_details(
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ task_try_number: int,
+ session: Annotated[Session, Depends(get_session)],
+ map_index: int,
+) -> TaskInstanceHistoryResponse:
+ return get_task_instance_try_details(
+ dag_id=dag_id,
+ dag_run_id=dag_run_id,
+ task_id=task_id,
+ task_try_number=task_try_number,
+ map_index=map_index,
+ session=session,
+ )
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index ea4e65d44c9..fe281c5640f 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1092,6 +1092,35 @@ export const
UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn = (
useTaskInstanceServiceGetTaskInstanceTryDetailsKey,
...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]),
];
+export type TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse =
+ Awaited<
+ ReturnType<typeof TaskInstanceService.getMappedTaskInstanceTryDetails>
+ >;
+export type TaskInstanceServiceGetMappedTaskInstanceTryDetailsQueryResult<
+ TData = TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse,
+ TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey =
+ "TaskInstanceServiceGetMappedTaskInstanceTryDetails";
+export const UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn = (
+ {
+ dagId,
+ dagRunId,
+ mapIndex,
+ taskId,
+ taskTryNumber,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ mapIndex: number;
+ taskId: string;
+ taskTryNumber: number;
+ },
+ queryKey?: Array<unknown>,
+) => [
+ useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey,
+ ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]),
+];
export type TaskServiceGetTasksDefaultResponse = Awaited<
ReturnType<typeof TaskService.getTasks>
>;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 273882c787d..f7872fcc7f8 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1471,6 +1471,46 @@ export const
prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = (
taskTryNumber,
}),
});
+/**
+ * Get Mapped Task Instance Try Details
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.taskTryNumber
+ * @param data.mapIndex
+ * @returns TaskInstanceHistoryResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseTaskInstanceServiceGetMappedTaskInstanceTryDetails = (
+ queryClient: QueryClient,
+ {
+ dagId,
+ dagRunId,
+ mapIndex,
+ taskId,
+ taskTryNumber,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ mapIndex: number;
+ taskId: string;
+ taskTryNumber: number;
+ },
+) =>
+ queryClient.prefetchQuery({
+ queryKey:
Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn(
+ { dagId, dagRunId, mapIndex, taskId, taskTryNumber },
+ ),
+ queryFn: () =>
+ TaskInstanceService.getMappedTaskInstanceTryDetails({
+ dagId,
+ dagRunId,
+ mapIndex,
+ taskId,
+ taskTryNumber,
+ }),
+ });
/**
* Get Tasks
* Get tasks for DAG.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 386575204aa..74e25c0258a 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -1762,6 +1762,53 @@ export const
useTaskInstanceServiceGetTaskInstanceTryDetails = <
}) as TData,
...options,
});
+/**
+ * Get Mapped Task Instance Try Details
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.taskTryNumber
+ * @param data.mapIndex
+ * @returns TaskInstanceHistoryResponse Successful Response
+ * @throws ApiError
+ */
+export const useTaskInstanceServiceGetMappedTaskInstanceTryDetails = <
+ TData =
Common.TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ mapIndex,
+ taskId,
+ taskTryNumber,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ mapIndex: number;
+ taskId: string;
+ taskTryNumber: number;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useQuery<TData, TError>({
+ queryKey:
Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn(
+ { dagId, dagRunId, mapIndex, taskId, taskTryNumber },
+ queryKey,
+ ),
+ queryFn: () =>
+ TaskInstanceService.getMappedTaskInstanceTryDetails({
+ dagId,
+ dagRunId,
+ mapIndex,
+ taskId,
+ taskTryNumber,
+ }) as TData,
+ ...options,
+ });
/**
* Get Tasks
* Get tasks for DAG.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 3d57a08a69e..87b1a7aa6a2 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1744,6 +1744,53 @@ export const
useTaskInstanceServiceGetTaskInstanceTryDetailsSuspense = <
}) as TData,
...options,
});
+/**
+ * Get Mapped Task Instance Try Details
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.taskTryNumber
+ * @param data.mapIndex
+ * @returns TaskInstanceHistoryResponse Successful Response
+ * @throws ApiError
+ */
+export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsSuspense = <
+ TData =
Common.TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse,
+ TError = unknown,
+ TQueryKey extends Array<unknown> = unknown[],
+>(
+ {
+ dagId,
+ dagRunId,
+ mapIndex,
+ taskId,
+ taskTryNumber,
+ }: {
+ dagId: string;
+ dagRunId: string;
+ mapIndex: number;
+ taskId: string;
+ taskTryNumber: number;
+ },
+ queryKey?: TQueryKey,
+ options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+ useSuspenseQuery<TData, TError>({
+ queryKey:
Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn(
+ { dagId, dagRunId, mapIndex, taskId, taskTryNumber },
+ queryKey,
+ ),
+ queryFn: () =>
+ TaskInstanceService.getMappedTaskInstanceTryDetails({
+ dagId,
+ dagRunId,
+ mapIndex,
+ taskId,
+ taskTryNumber,
+ }) as TData,
+ ...options,
+ });
/**
* Get Tasks
* Get tasks for DAG.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 7c0c348c753..908a715ab69 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -125,6 +125,8 @@ import type {
GetTaskInstancesBatchResponse,
GetTaskInstanceTryDetailsData,
GetTaskInstanceTryDetailsResponse,
+ GetMappedTaskInstanceTryDetailsData,
+ GetMappedTaskInstanceTryDetailsResponse,
GetTasksData,
GetTasksResponse,
GetTaskData,
@@ -2095,6 +2097,39 @@ export class TaskInstanceService {
},
});
}
+
+ /**
+ * Get Mapped Task Instance Try Details
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.taskTryNumber
+ * @param data.mapIndex
+ * @returns TaskInstanceHistoryResponse Successful Response
+ * @throws ApiError
+ */
+ public static getMappedTaskInstanceTryDetails(
+ data: GetMappedTaskInstanceTryDetailsData,
+ ): CancelablePromise<GetMappedTaskInstanceTryDetailsResponse> {
+ return __request(OpenAPI, {
+ method: "GET",
+ url:
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}",
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ task_id: data.taskId,
+ task_try_number: data.taskTryNumber,
+ map_index: data.mapIndex,
+ },
+ errors: {
+ 401: "Unauthorized",
+ 403: "Forbidden",
+ 404: "Not Found",
+ 422: "Validation Error",
+ },
+ });
+ }
}
export class TaskService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index e78e3a18685..4a9d46d3c21 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1595,6 +1595,17 @@ export type GetTaskInstanceTryDetailsData = {
export type GetTaskInstanceTryDetailsResponse = TaskInstanceHistoryResponse;
+export type GetMappedTaskInstanceTryDetailsData = {
+ dagId: string;
+ dagRunId: string;
+ mapIndex: number;
+ taskId: string;
+ taskTryNumber: number;
+};
+
+export type GetMappedTaskInstanceTryDetailsResponse =
+ TaskInstanceHistoryResponse;
+
export type GetTasksData = {
dagId: string;
orderBy?: string;
@@ -3289,6 +3300,33 @@ export type $OpenApiTs = {
};
};
};
+
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}":
{
+ get: {
+ req: GetMappedTaskInstanceTryDetailsData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: TaskInstanceHistoryResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
"/public/dags/{dag_id}/tasks/": {
get: {
req: GetTasksData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
index 56e2ee5e0e1..f8e75600171 100644
--- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -1497,6 +1497,68 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
"dag_run_id": "TEST_DAG_RUN_ID",
}
+ @pytest.mark.parametrize("try_number", [1, 2])
+ def test_should_respond_200_with_mapped_task_at_different_try_numbers(
+ self, test_client, try_number, session
+ ):
+ tis = self.create_task_instances(session, task_instances=[{"state":
State.FAILED}])
+ old_ti = tis[0]
+ for idx in (1, 2):
+ ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id,
map_index=idx)
+ ti.rendered_task_instance_fields = RTIF(ti, render_templates=False)
+ ti.try_number = 1
+ for attr in ["duration", "end_date", "pid", "start_date", "state",
"queue", "note"]:
+ setattr(ti, attr, getattr(old_ti, attr))
+ session.add(ti)
+ session.commit()
+ tis = session.query(TaskInstance).all()
+ # Record the task instance history
+ from airflow.models.taskinstance import clear_task_instances
+
+ clear_task_instances(tis, session)
+ # Simulate the try_number increasing to new values in TI
+ for ti in tis:
+ if ti.map_index > 0:
+ ti.try_number += 1
+ ti.queue = "default_queue"
+ session.merge(ti)
+ session.commit()
+ tis = session.query(TaskInstance).all()
+ # in each loop, we should get the right mapped TI back
+ for map_index in (1, 2):
+ # Get the info from TIHistory: try_number 1, try_number 2 is TI
table(latest)
+ # TODO: Add "REMOTE_USER": "test" as per legacy code after adding
Authentication
+ response = test_client.get(
+
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances"
+ f"/print_the_context/{map_index}/tries/{try_number}",
+ )
+ assert response.status_code == 200
+
+ assert response.json() == {
+ "dag_id": "example_python_operator",
+ "duration": 10000.0,
+ "end_date": "2020-01-03T00:00:00Z",
+ "executor": None,
+ "executor_config": "{}",
+ "hostname": "",
+ "map_index": map_index,
+ "max_tries": 0 if try_number == 1 else 1,
+ "operator": "PythonOperator",
+ "pid": 100,
+ "pool": "default_pool",
+ "pool_slots": 1,
+ "priority_weight": 9,
+ "queue": "default_queue",
+ "queued_when": None,
+ "start_date": "2020-01-02T00:00:00Z",
+ "state": "failed" if try_number == 1 else None,
+ "task_id": "print_the_context",
+ "task_display_name": "print_the_context",
+ "try_number": try_number,
+ "unixname": getuser(),
+ "dag_run_id": "TEST_DAG_RUN_ID",
+ }
+
def test_should_respond_200_with_task_state_in_deferred(self, test_client,
session):
now = pendulum.now("UTC")
ti = self.create_task_instances(