This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d11a504c8ec AIP-84 Get Mapped Task Instance Try Details (#44206)
d11a504c8ec is described below

commit d11a504c8ecd4d3024ae60cad9e8efc2fb642ba4
Author: kandharvishnu <[email protected]>
AuthorDate: Wed Nov 20 22:22:07 2024 +0530

    AIP-84 Get Mapped Task Instance Try Details (#44206)
    
    * adding get_mapped_task_instance_try_details
    
    * dummy change
    
    * revert dummy change
    
    * removing print statements
    
    ---------
    
    Co-authored-by: kandharvishnuu 
<[email protected]>
---
 .../endpoints/task_instance_endpoint.py            |  1 +
 .../api_fastapi/core_api/openapi/v1-generated.yaml | 68 ++++++++++++++++++++++
 .../core_api/routes/public/task_instances.py       | 22 +++++++
 airflow/ui/openapi-gen/queries/common.ts           | 29 +++++++++
 airflow/ui/openapi-gen/queries/prefetch.ts         | 40 +++++++++++++
 airflow/ui/openapi-gen/queries/queries.ts          | 47 +++++++++++++++
 airflow/ui/openapi-gen/queries/suspense.ts         | 47 +++++++++++++++
 airflow/ui/openapi-gen/requests/services.gen.ts    | 35 +++++++++++
 airflow/ui/openapi-gen/requests/types.gen.ts       | 38 ++++++++++++
 .../core_api/routes/public/test_task_instances.py  | 62 ++++++++++++++++++++
 10 files changed, 389 insertions(+)

diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py 
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index 00eb51bae10..f4f7ac23cb8 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -756,6 +756,7 @@ def get_task_instance_try_details(
     return task_instance_history_schema.dump(result[0])
 
 
+@mark_fastapi_migration_done
 @provide_session
 def get_mapped_task_instance_try_details(
     *,
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml 
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index c1aadeb6b6d..c58bf1c6be4 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -3961,6 +3961,74 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  
/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}:
+    get:
+      tags:
+      - Task Instance
+      summary: Get Mapped Task Instance Try Details
+      operationId: get_mapped_task_instance_try_details
+      parameters:
+      - name: dag_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Id
+      - name: dag_run_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Dag Run Id
+      - name: task_id
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Task Id
+      - name: task_try_number
+        in: path
+        required: true
+        schema:
+          type: integer
+          title: Task Try Number
+      - name: map_index
+        in: path
+        required: true
+        schema:
+          type: integer
+          title: Map Index
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/TaskInstanceHistoryResponse'
+        '401':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Unauthorized
+        '403':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Forbidden
+        '404':
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPExceptionResponse'
+          description: Not Found
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
   /public/dags/{dag_id}/tasks/:
     get:
       tags:
diff --git a/airflow/api_fastapi/core_api/routes/public/task_instances.py 
b/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 9e4ea49e088..f4769a981b8 100644
--- a/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ b/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -460,3 +460,25 @@ def get_task_instance_try_details(
             f"The Task Instance with dag_id: `{dag_id}`, run_id: 
`{dag_run_id}`, task_id: `{task_id}`, try_number: `{task_try_number}` and 
map_index: `{map_index}` was not found",
         )
     return TaskInstanceHistoryResponse.model_validate(result, 
from_attributes=True)
+
+
+@task_instances_router.get(
+    "/{task_id}/{map_index}/tries/{task_try_number}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+)
+def get_mapped_task_instance_try_details(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    task_try_number: int,
+    session: Annotated[Session, Depends(get_session)],
+    map_index: int,
+) -> TaskInstanceHistoryResponse:
+    return get_task_instance_try_details(
+        dag_id=dag_id,
+        dag_run_id=dag_run_id,
+        task_id=task_id,
+        task_try_number=task_try_number,
+        map_index=map_index,
+        session=session,
+    )
diff --git a/airflow/ui/openapi-gen/queries/common.ts 
b/airflow/ui/openapi-gen/queries/common.ts
index ea4e65d44c9..fe281c5640f 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -1092,6 +1092,35 @@ export const 
UseTaskInstanceServiceGetTaskInstanceTryDetailsKeyFn = (
   useTaskInstanceServiceGetTaskInstanceTryDetailsKey,
   ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]),
 ];
+export type TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse =
+  Awaited<
+    ReturnType<typeof TaskInstanceService.getMappedTaskInstanceTryDetails>
+  >;
+export type TaskInstanceServiceGetMappedTaskInstanceTryDetailsQueryResult<
+  TData = TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse,
+  TError = unknown,
+> = UseQueryResult<TData, TError>;
+export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey =
+  "TaskInstanceServiceGetMappedTaskInstanceTryDetails";
+export const UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn = (
+  {
+    dagId,
+    dagRunId,
+    mapIndex,
+    taskId,
+    taskTryNumber,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    mapIndex: number;
+    taskId: string;
+    taskTryNumber: number;
+  },
+  queryKey?: Array<unknown>,
+) => [
+  useTaskInstanceServiceGetMappedTaskInstanceTryDetailsKey,
+  ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, taskTryNumber }]),
+];
 export type TaskServiceGetTasksDefaultResponse = Awaited<
   ReturnType<typeof TaskService.getTasks>
 >;
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts 
b/airflow/ui/openapi-gen/queries/prefetch.ts
index 273882c787d..f7872fcc7f8 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1471,6 +1471,46 @@ export const 
prefetchUseTaskInstanceServiceGetTaskInstanceTryDetails = (
         taskTryNumber,
       }),
   });
+/**
+ * Get Mapped Task Instance Try Details
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.taskTryNumber
+ * @param data.mapIndex
+ * @returns TaskInstanceHistoryResponse Successful Response
+ * @throws ApiError
+ */
+export const prefetchUseTaskInstanceServiceGetMappedTaskInstanceTryDetails = (
+  queryClient: QueryClient,
+  {
+    dagId,
+    dagRunId,
+    mapIndex,
+    taskId,
+    taskTryNumber,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    mapIndex: number;
+    taskId: string;
+    taskTryNumber: number;
+  },
+) =>
+  queryClient.prefetchQuery({
+    queryKey: 
Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn(
+      { dagId, dagRunId, mapIndex, taskId, taskTryNumber },
+    ),
+    queryFn: () =>
+      TaskInstanceService.getMappedTaskInstanceTryDetails({
+        dagId,
+        dagRunId,
+        mapIndex,
+        taskId,
+        taskTryNumber,
+      }),
+  });
 /**
  * Get Tasks
  * Get tasks for DAG.
diff --git a/airflow/ui/openapi-gen/queries/queries.ts 
b/airflow/ui/openapi-gen/queries/queries.ts
index 386575204aa..74e25c0258a 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -1762,6 +1762,53 @@ export const 
useTaskInstanceServiceGetTaskInstanceTryDetails = <
       }) as TData,
     ...options,
   });
+/**
+ * Get Mapped Task Instance Try Details
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.taskTryNumber
+ * @param data.mapIndex
+ * @returns TaskInstanceHistoryResponse Successful Response
+ * @throws ApiError
+ */
+export const useTaskInstanceServiceGetMappedTaskInstanceTryDetails = <
+  TData = 
Common.TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    dagRunId,
+    mapIndex,
+    taskId,
+    taskTryNumber,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    mapIndex: number;
+    taskId: string;
+    taskTryNumber: number;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useQuery<TData, TError>({
+    queryKey: 
Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn(
+      { dagId, dagRunId, mapIndex, taskId, taskTryNumber },
+      queryKey,
+    ),
+    queryFn: () =>
+      TaskInstanceService.getMappedTaskInstanceTryDetails({
+        dagId,
+        dagRunId,
+        mapIndex,
+        taskId,
+        taskTryNumber,
+      }) as TData,
+    ...options,
+  });
 /**
  * Get Tasks
  * Get tasks for DAG.
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts 
b/airflow/ui/openapi-gen/queries/suspense.ts
index 3d57a08a69e..87b1a7aa6a2 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1744,6 +1744,53 @@ export const 
useTaskInstanceServiceGetTaskInstanceTryDetailsSuspense = <
       }) as TData,
     ...options,
   });
+/**
+ * Get Mapped Task Instance Try Details
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.taskTryNumber
+ * @param data.mapIndex
+ * @returns TaskInstanceHistoryResponse Successful Response
+ * @throws ApiError
+ */
+export const useTaskInstanceServiceGetMappedTaskInstanceTryDetailsSuspense = <
+  TData = 
Common.TaskInstanceServiceGetMappedTaskInstanceTryDetailsDefaultResponse,
+  TError = unknown,
+  TQueryKey extends Array<unknown> = unknown[],
+>(
+  {
+    dagId,
+    dagRunId,
+    mapIndex,
+    taskId,
+    taskTryNumber,
+  }: {
+    dagId: string;
+    dagRunId: string;
+    mapIndex: number;
+    taskId: string;
+    taskTryNumber: number;
+  },
+  queryKey?: TQueryKey,
+  options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
+) =>
+  useSuspenseQuery<TData, TError>({
+    queryKey: 
Common.UseTaskInstanceServiceGetMappedTaskInstanceTryDetailsKeyFn(
+      { dagId, dagRunId, mapIndex, taskId, taskTryNumber },
+      queryKey,
+    ),
+    queryFn: () =>
+      TaskInstanceService.getMappedTaskInstanceTryDetails({
+        dagId,
+        dagRunId,
+        mapIndex,
+        taskId,
+        taskTryNumber,
+      }) as TData,
+    ...options,
+  });
 /**
  * Get Tasks
  * Get tasks for DAG.
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts 
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 7c0c348c753..908a715ab69 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -125,6 +125,8 @@ import type {
   GetTaskInstancesBatchResponse,
   GetTaskInstanceTryDetailsData,
   GetTaskInstanceTryDetailsResponse,
+  GetMappedTaskInstanceTryDetailsData,
+  GetMappedTaskInstanceTryDetailsResponse,
   GetTasksData,
   GetTasksResponse,
   GetTaskData,
@@ -2095,6 +2097,39 @@ export class TaskInstanceService {
       },
     });
   }
+
+  /**
+   * Get Mapped Task Instance Try Details
+   * @param data The data for the request.
+   * @param data.dagId
+   * @param data.dagRunId
+   * @param data.taskId
+   * @param data.taskTryNumber
+   * @param data.mapIndex
+   * @returns TaskInstanceHistoryResponse Successful Response
+   * @throws ApiError
+   */
+  public static getMappedTaskInstanceTryDetails(
+    data: GetMappedTaskInstanceTryDetailsData,
+  ): CancelablePromise<GetMappedTaskInstanceTryDetailsResponse> {
+    return __request(OpenAPI, {
+      method: "GET",
+      url: 
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}",
+      path: {
+        dag_id: data.dagId,
+        dag_run_id: data.dagRunId,
+        task_id: data.taskId,
+        task_try_number: data.taskTryNumber,
+        map_index: data.mapIndex,
+      },
+      errors: {
+        401: "Unauthorized",
+        403: "Forbidden",
+        404: "Not Found",
+        422: "Validation Error",
+      },
+    });
+  }
 }
 
 export class TaskService {
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow/ui/openapi-gen/requests/types.gen.ts
index e78e3a18685..4a9d46d3c21 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -1595,6 +1595,17 @@ export type GetTaskInstanceTryDetailsData = {
 
 export type GetTaskInstanceTryDetailsResponse = TaskInstanceHistoryResponse;
 
+export type GetMappedTaskInstanceTryDetailsData = {
+  dagId: string;
+  dagRunId: string;
+  mapIndex: number;
+  taskId: string;
+  taskTryNumber: number;
+};
+
+export type GetMappedTaskInstanceTryDetailsResponse =
+  TaskInstanceHistoryResponse;
+
 export type GetTasksData = {
   dagId: string;
   orderBy?: string;
@@ -3289,6 +3300,33 @@ export type $OpenApiTs = {
       };
     };
   };
+  
"/public/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}":
 {
+    get: {
+      req: GetMappedTaskInstanceTryDetailsData;
+      res: {
+        /**
+         * Successful Response
+         */
+        200: TaskInstanceHistoryResponse;
+        /**
+         * Unauthorized
+         */
+        401: HTTPExceptionResponse;
+        /**
+         * Forbidden
+         */
+        403: HTTPExceptionResponse;
+        /**
+         * Not Found
+         */
+        404: HTTPExceptionResponse;
+        /**
+         * Validation Error
+         */
+        422: HTTPValidationError;
+      };
+    };
+  };
   "/public/dags/{dag_id}/tasks/": {
     get: {
       req: GetTasksData;
diff --git a/tests/api_fastapi/core_api/routes/public/test_task_instances.py 
b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
index 56e2ee5e0e1..f8e75600171 100644
--- a/tests/api_fastapi/core_api/routes/public/test_task_instances.py
+++ b/tests/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -1497,6 +1497,68 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint):
             "dag_run_id": "TEST_DAG_RUN_ID",
         }
 
+    @pytest.mark.parametrize("try_number", [1, 2])
+    def test_should_respond_200_with_mapped_task_at_different_try_numbers(
+        self, test_client, try_number, session
+    ):
+        tis = self.create_task_instances(session, task_instances=[{"state": 
State.FAILED}])
+        old_ti = tis[0]
+        for idx in (1, 2):
+            ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, 
map_index=idx)
+            ti.rendered_task_instance_fields = RTIF(ti, render_templates=False)
+            ti.try_number = 1
+            for attr in ["duration", "end_date", "pid", "start_date", "state", 
"queue", "note"]:
+                setattr(ti, attr, getattr(old_ti, attr))
+            session.add(ti)
+        session.commit()
+        tis = session.query(TaskInstance).all()
+        # Record the task instance history
+        from airflow.models.taskinstance import clear_task_instances
+
+        clear_task_instances(tis, session)
+        # Simulate the try_number increasing to new values in TI
+        for ti in tis:
+            if ti.map_index > 0:
+                ti.try_number += 1
+                ti.queue = "default_queue"
+                session.merge(ti)
+        session.commit()
+        tis = session.query(TaskInstance).all()
+        # in each loop, we should get the right mapped TI back
+        for map_index in (1, 2):
+            # Get the info from TIHistory: try_number 1, try_number 2 is TI 
table(latest)
+            # TODO: Add "REMOTE_USER": "test" as per legacy code after adding 
Authentication
+            response = test_client.get(
+                
"/public/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances"
+                f"/print_the_context/{map_index}/tries/{try_number}",
+            )
+            assert response.status_code == 200
+
+            assert response.json() == {
+                "dag_id": "example_python_operator",
+                "duration": 10000.0,
+                "end_date": "2020-01-03T00:00:00Z",
+                "executor": None,
+                "executor_config": "{}",
+                "hostname": "",
+                "map_index": map_index,
+                "max_tries": 0 if try_number == 1 else 1,
+                "operator": "PythonOperator",
+                "pid": 100,
+                "pool": "default_pool",
+                "pool_slots": 1,
+                "priority_weight": 9,
+                "queue": "default_queue",
+                "queued_when": None,
+                "start_date": "2020-01-02T00:00:00Z",
+                "state": "failed" if try_number == 1 else None,
+                "task_id": "print_the_context",
+                "task_display_name": "print_the_context",
+                "try_number": try_number,
+                "unixname": getuser(),
+                "dag_run_id": "TEST_DAG_RUN_ID",
+            }
+
     def test_should_respond_200_with_task_state_in_deferred(self, test_client, 
session):
         now = pendulum.now("UTC")
         ti = self.create_task_instances(

Reply via email to