This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit aa36b13c23f8d98e7d5bbb8e43ae886332319989 Author: Guan Ming(Wesley) Chiu <[email protected]> AuthorDate: Tue Sep 16 03:12:45 2025 +0800 Add map_index filter to TaskInstance API queries (#55614) * Add QueryTIMapIndexFilter to API parameters for task instance queries * Update test case to prevent confusion (cherry picked from commit 62fbe0178a1361b84c931451783c7a68ac578058) --- .../src/airflow/api_fastapi/common/parameters.py | 9 +++++++++ .../core_api/openapi/v2-rest-api-generated.yaml | 16 ++++++++++++++++ .../core_api/routes/public/task_instances.py | 5 +++++ .../src/airflow/ui/openapi-gen/queries/common.ts | 10 ++++++---- .../airflow/ui/openapi-gen/queries/ensureQueryData.ts | 12 ++++++++---- .../src/airflow/ui/openapi-gen/queries/prefetch.ts | 12 ++++++++---- .../src/airflow/ui/openapi-gen/queries/queries.ts | 12 ++++++++---- .../src/airflow/ui/openapi-gen/queries/suspense.ts | 12 ++++++++---- .../airflow/ui/openapi-gen/requests/services.gen.ts | 4 ++++ .../src/airflow/ui/openapi-gen/requests/types.gen.ts | 2 ++ .../airflow/ui/src/layouts/Details/Gantt/Gantt.tsx | 3 ++- .../core_api/routes/public/test_task_instances.py | 19 +++++++++++++++++++ 12 files changed, 95 insertions(+), 21 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index bef1659c12d..cc7ae250cba 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -902,6 +902,15 @@ QueryTIOperatorFilter = Annotated[ ), ] +QueryTIMapIndexFilter = Annotated[ + FilterParam[list[int]], + Depends( + filter_param_factory( + TaskInstance.map_index, list[int], FilterOptionEnum.ANY_EQUAL, default_factory=list + ) + ), +] + # XCom QueryXComKeyPatternSearch = Annotated[ _SearchParam, Depends(search_param_factory(XComModel.key, "xcom_key_pattern")) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 7b28e615e10..7497456b84a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -5715,6 +5715,14 @@ paths: items: type: string title: Operator + - name: map_index + in: query + required: false + schema: + type: array + items: + type: integer + title: Map Index - name: limit in: query required: false @@ -6515,6 +6523,14 @@ paths: items: type: string title: Operator + - name: map_index + in: query + required: false + schema: + type: array + items: + type: integer + title: Map Index - name: limit in: query required: false diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 87d76fedc92..28831ac01e6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -42,6 +42,7 @@ from airflow.api_fastapi.common.parameters import ( QueryOffset, QueryTIDagVersionFilter, QueryTIExecutorFilter, + QueryTIMapIndexFilter, QueryTIOperatorFilter, QueryTIPoolFilter, QueryTIQueueFilter, @@ -149,6 +150,7 @@ def get_mapped_task_instances( version_number: QueryTIDagVersionFilter, try_number: QueryTITryNumberFilter, operator: QueryTIOperatorFilter, + map_index: QueryTIMapIndexFilter, limit: QueryLimit, offset: QueryOffset, order_by: Annotated[ @@ -220,6 +222,7 @@ def get_mapped_task_instances( version_number, try_number, operator, + map_index, ], order_by=order_by, offset=offset, @@ -412,6 +415,7 @@ def get_task_instances( version_number: QueryTIDagVersionFilter, try_number: QueryTITryNumberFilter, operator: QueryTIOperatorFilter, + map_index: QueryTIMapIndexFilter, limit: QueryLimit, offset: QueryOffset, order_by: Annotated[ @@ -491,6 +495,7 @@ def get_task_instances( readable_ti_filter, try_number, operator, + map_index, ], order_by=order_by, offset=offset, diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts index 5e04cd63524..6a237a0c8bb 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts @@ -382,7 +382,7 @@ export const UseTaskInstanceServiceGetTaskInstanceKeyFn = ({ dagId, dagRunId, ta export type TaskInstanceServiceGetMappedTaskInstancesDefaultResponse = Awaited<ReturnType<typeof TaskInstanceService.getMappedTaskInstances>>; export type TaskInstanceServiceGetMappedTaskInstancesQueryResult<TData = TaskInstanceServiceGetMappedTaskInstancesDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>; export const useTaskInstanceServiceGetMappedTaskInstancesKey = "TaskInstanceServiceGetMappedTaskInstances"; -export const UseTaskInstanceServiceGetMappedTaskInstancesKeyFn = ({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte, versionN [...] +export const UseTaskInstanceServiceGetMappedTaskInstancesKeyFn = ({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte [...] dagId: string; dagRunId: string; durationGt?: number; @@ -399,6 +399,7 @@ export const UseTaskInstanceServiceGetMappedTaskInstancesKeyFn = ({ dagId, dagRu logicalDateGte?: string; logicalDateLt?: string; logicalDateLte?: string; + mapIndex?: number[]; offset?: number; operator?: string[]; orderBy?: string[]; @@ -420,7 +421,7 @@ export const UseTaskInstanceServiceGetMappedTaskInstancesKeyFn = ({ dagId, dagRu updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetMappedTaskInstancesKey, ...(queryKey ?? [{ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, updatedAtGte, u [...] +}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetMappedTaskInstancesKey, ...(queryKey ?? [{ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, updat [...] export type TaskInstanceServiceGetTaskInstanceDependenciesByMapIndexDefaultResponse = Awaited<ReturnType<typeof TaskInstanceService.getTaskInstanceDependenciesByMapIndex>>; export type TaskInstanceServiceGetTaskInstanceDependenciesByMapIndexQueryResult<TData = TaskInstanceServiceGetTaskInstanceDependenciesByMapIndexDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>; export const useTaskInstanceServiceGetTaskInstanceDependenciesByMapIndexKey = "TaskInstanceServiceGetTaskInstanceDependenciesByMapIndex"; @@ -469,7 +470,7 @@ export const UseTaskInstanceServiceGetMappedTaskInstanceKeyFn = ({ dagId, dagRun export type TaskInstanceServiceGetTaskInstancesDefaultResponse = Awaited<ReturnType<typeof TaskInstanceService.getTaskInstances>>; export type TaskInstanceServiceGetTaskInstancesQueryResult<TData = TaskInstanceServiceGetTaskInstancesDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>; export const useTaskInstanceServiceGetTaskInstancesKey = "TaskInstanceServiceGetTaskInstances"; -export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updatedAtLt, upda [...] +export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, updatedAtGte, updated [...] dagId: string; dagRunId: string; durationGt?: number; @@ -486,6 +487,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagRunId, d logicalDateGte?: string; logicalDateLt?: string; logicalDateLte?: string; + mapIndex?: number[]; offset?: number; operator?: string[]; orderBy?: string[]; @@ -508,7 +510,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagRunId, d updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetTaskInstancesKey, ...(queryKey ?? [{ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtG [...] +}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetTaskInstancesKey, ...(queryKey ?? [{ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, [...] export type TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse = Awaited<ReturnType<typeof TaskInstanceService.getTaskInstanceTryDetails>>; export type TaskInstanceServiceGetTaskInstanceTryDetailsQueryResult<TData = TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>; export const useTaskInstanceServiceGetTaskInstanceTryDetailsKey = "TaskInstanceServiceGetTaskInstanceTryDetails"; diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts index beb84ec2793..b18892b9799 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts @@ -750,13 +750,14 @@ export const ensureUseTaskInstanceServiceGetTaskInstanceData = (queryClient: Que * @param data.versionNumber * @param data.tryNumber * @param data.operator +* @param data.mapIndex * @param data.limit * @param data.offset * @param data.orderBy * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const ensureUseTaskInstanceServiceGetMappedTaskInstancesData = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, updatedAtGte, upda [...] +export const ensureUseTaskInstanceServiceGetMappedTaskInstancesData = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, updatedA [...] dagId: string; dagRunId: string; durationGt?: number; @@ -773,6 +774,7 @@ export const ensureUseTaskInstanceServiceGetMappedTaskInstancesData = (queryClie logicalDateGte?: string; logicalDateLt?: string; logicalDateLte?: string; + mapIndex?: number[]; offset?: number; operator?: string[]; orderBy?: string[]; @@ -794,7 +796,7 @@ export const ensureUseTaskInstanceServiceGetMappedTaskInstancesData = (queryClie updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, updatedAtGte [...] +}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, up [...] /** * Get Task Instance Dependencies * Get dependencies blocking task from getting scheduled. @@ -921,13 +923,14 @@ export const ensureUseTaskInstanceServiceGetMappedTaskInstanceData = (queryClien * @param data.versionNumber * @param data.tryNumber * @param data.operator +* @param data.mapIndex * @param data.limit * @param data.offset * @param data.orderBy * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, [...] +export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, upd [...] dagId: string; dagRunId: string; durationGt?: number; @@ -944,6 +947,7 @@ export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: Qu logicalDateGte?: string; logicalDateLt?: string; logicalDateLte?: string; + mapIndex?: number[]; offset?: number; operator?: string[]; orderBy?: string[]; @@ -966,7 +970,7 @@ export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: Qu updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updated [...] +}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumbe [...] /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts index 7eba9bf7b73..4921d78c74a 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts @@ -750,13 +750,14 @@ export const prefetchUseTaskInstanceServiceGetTaskInstance = (queryClient: Query * @param data.versionNumber * @param data.tryNumber * @param data.operator +* @param data.mapIndex * @param data.limit * @param data.offset * @param data.orderBy * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseTaskInstanceServiceGetMappedTaskInstances = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, updatedAtGte, update [...] +export const prefetchUseTaskInstanceServiceGetMappedTaskInstances = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, updatedAtG [...] dagId: string; dagRunId: string; durationGt?: number; @@ -773,6 +774,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstances = (queryClient logicalDateGte?: string; logicalDateLt?: string; logicalDateLte?: string; + mapIndex?: number[]; offset?: number; operator?: string[]; orderBy?: string[]; @@ -794,7 +796,7 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstances = (queryClient updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, updatedAtGte, [...] +}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskId, tryNumber, updatedAtGt, upda [...] /** * Get Task Instance Dependencies * Get dependencies blocking task from getting scheduled. @@ -921,13 +923,14 @@ export const prefetchUseTaskInstanceServiceGetMappedTaskInstance = (queryClient: * @param data.versionNumber * @param data.tryNumber * @param data.operator +* @param data.mapIndex * @param data.limit * @param data.offset * @param data.orderBy * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAtGt, up [...] +export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updat [...] dagId: string; dagRunId: string; durationGt?: number; @@ -944,6 +947,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: Quer logicalDateGte?: string; logicalDateLt?: string; logicalDateLte?: string; + mapIndex?: number[]; offset?: number; operator?: string[]; orderBy?: string[]; @@ -966,7 +970,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient: Quer updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, updatedAt [...] +}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId, tryNumber, [...] /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index 31a2a9eb6b5..87263738664 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -750,13 +750,14 @@ export const useTaskInstanceServiceGetTaskInstance = <TData = Common.TaskInstanc * @param data.versionNumber * @param data.tryNumber * @param data.operator +* @param data.mapIndex * @param data.limit * @param data.offset * @param data.orderBy * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const useTaskInstanceServiceGetMappedTaskInstances = <TData = Common.TaskInstanceServiceGetMappedTaskInstancesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateG [...] +export const useTaskInstanceServiceGetMappedTaskInstances = <TData = Common.TaskInstanceServiceGetMappedTaskInstancesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, [...] dagId: string; dagRunId: string; durationGt?: number; @@ -773,6 +774,7 @@ export const useTaskInstanceServiceGetMappedTaskInstances = <TData = Common.Task logicalDateGte?: string; logicalDateLt?: string; logicalDateLte?: string; + mapIndex?: number[]; offset?: number; operator?: string[]; orderBy?: string[]; @@ -794,7 +796,7 @@ export const useTaskInstanceServiceGetMappedTaskInstances = <TData = Common.Task updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, st [...] +}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, start [...] /** * Get Task Instance Dependencies * Get dependencies blocking task from getting scheduled. @@ -921,13 +923,14 @@ export const useTaskInstanceServiceGetMappedTaskInstance = <TData = Common.TaskI * @param data.versionNumber * @param data.tryNumber * @param data.operator +* @param data.mapIndex * @param data.limit * @param data.offset * @param data.orderBy * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const useTaskInstanceServiceGetTaskInstances = <TData = Common.TaskInstanceServiceGetTaskInstancesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDate [...] +export const useTaskInstanceServiceGetTaskInstances = <TData = Common.TaskInstanceServiceGetTaskInstancesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, [...] dagId: string; dagRunId: string; durationGt?: number; @@ -944,6 +947,7 @@ export const useTaskInstanceServiceGetTaskInstances = <TData = Common.TaskInstan logicalDateGte?: string; logicalDateLt?: string; logicalDateLte?: string; + mapIndex?: number[]; offset?: number; operator?: string[]; orderBy?: string[]; @@ -966,7 +970,7 @@ export const useTaskInstanceServiceGetTaskInstances = <TData = Common.TaskInstan updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, startDat [...] +}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt [...] /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts index bb5332268da..a2c61b40231 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts @@ -750,13 +750,14 @@ export const useTaskInstanceServiceGetTaskInstanceSuspense = <TData = Common.Tas * @param data.versionNumber * @param data.tryNumber * @param data.operator +* @param data.mapIndex * @param data.limit * @param data.offset * @param data.orderBy * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const useTaskInstanceServiceGetMappedTaskInstancesSuspense = <TData = Common.TaskInstanceServiceGetMappedTaskInstancesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, st [...] +export const useTaskInstanceServiceGetMappedTaskInstancesSuspense = <TData = Common.TaskInstanceServiceGetMappedTaskInstancesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAf [...] dagId: string; dagRunId: string; durationGt?: number; @@ -773,6 +774,7 @@ export const useTaskInstanceServiceGetMappedTaskInstancesSuspense = <TData = Com logicalDateGte?: string; logicalDateLt?: string; logicalDateLte?: string; + mapIndex?: number[]; offset?: number; operator?: string[]; orderBy?: string[]; @@ -794,7 +796,7 @@ export const useTaskInstanceServiceGetMappedTaskInstancesSuspense = <TData = Com updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDa [...] +}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetMappedTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLt [...] /** * Get Task Instance Dependencies * Get dependencies blocking task from getting scheduled. @@ -921,13 +923,14 @@ export const useTaskInstanceServiceGetMappedTaskInstanceSuspense = <TData = Comm * @param data.versionNumber * @param data.tryNumber * @param data.operator +* @param data.mapIndex * @param data.limit * @param data.offset * @param data.orderBy * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ -export const useTaskInstanceServiceGetTaskInstancesSuspense = <TData = Common.TaskInstanceServiceGetTaskInstancesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, s [...] +export const useTaskInstanceServiceGetTaskInstancesSuspense = <TData = Common.TaskInstanceServiceGetTaskInstancesDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, star [...] dagId: string; dagRunId: string; durationGt?: number; @@ -944,6 +947,7 @@ export const useTaskInstanceServiceGetTaskInstancesSuspense = <TData = Common.Ta logicalDateGte?: string; logicalDateLt?: string; logicalDateLte?: string; + mapIndex?: number[]; offset?: number; operator?: string[]; orderBy?: string[]; @@ -966,7 +970,7 @@ export const useTaskInstanceServiceGetTaskInstancesSuspense = <TData = Common.Ta updatedAtLt?: string; updatedAtLte?: string; versionNumber?: number[]; -}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt, [...] +}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId, durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, sta [...] /** * Get Task Instance Try Details * Get task instance details by try number. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index d9abbf77eb9..7bf0a9aede3 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -2053,6 +2053,7 @@ export class TaskInstanceService { * @param data.versionNumber * @param data.tryNumber * @param data.operator + * @param data.mapIndex * @param data.limit * @param data.offset * @param data.orderBy @@ -2100,6 +2101,7 @@ export class TaskInstanceService { version_number: data.versionNumber, try_number: data.tryNumber, operator: data.operator, + map_index: data.mapIndex, limit: data.limit, offset: data.offset, order_by: data.orderBy @@ -2347,6 +2349,7 @@ export class TaskInstanceService { * @param data.versionNumber * @param data.tryNumber * @param data.operator + * @param data.mapIndex * @param data.limit * @param data.offset * @param data.orderBy @@ -2395,6 +2398,7 @@ export class TaskInstanceService { version_number: data.versionNumber, try_number: data.tryNumber, operator: data.operator, + map_index: data.mapIndex, limit: data.limit, offset: data.offset, order_by: data.orderBy diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 2f2789b2126..49ecfb4a71a 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2613,6 +2613,7 @@ export type GetMappedTaskInstancesData = { logicalDateGte?: string | null; logicalDateLt?: string | null; logicalDateLte?: string | null; + mapIndex?: Array<(number)>; offset?: number; operator?: Array<(string)>; orderBy?: Array<(string)>; @@ -2711,6 +2712,7 @@ export type GetTaskInstancesData = { logicalDateGte?: string | null; logicalDateLt?: string | null; logicalDateLte?: string | null; + mapIndex?: Array<(number)>; offset?: number; operator?: Array<(string)>; orderBy?: Array<(string)>; diff --git a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx index d06aa269033..ff522f06376 100644 --- a/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx +++ b/airflow-core/src/airflow/ui/src/layouts/Details/Gantt/Gantt.tsx @@ -111,11 +111,12 @@ export const Gantt = ({ limit }: Props) => { state: selectedRun?.state, }); - // Get individual task instances for tasks (which have start/end times) + // Get non mapped task instances for tasks (which have start/end times) const { data: taskInstancesData, isLoading: tiLoading } = useTaskInstanceServiceGetTaskInstances( { dagId, dagRunId: runId, + mapIndex: [-1], }, undefined, { diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 2c10ef1e341..191276ea74b 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -884,6 +884,8 @@ class TestGetMappedTaskInstances: ({"queue": "test_queue"}, 0, 0), ({"executor": "default"}, 3, 3), ({"executor": "no_exec"}, 0, 0), + ({"map_index": [0, 1]}, 2, 2), + ({"map_index": [5]}, 0, 0), ], ) def test_mapped_task_instances_filters( @@ -1225,6 +1227,23 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint): 5, id="test operator type filter filter", ), + pytest.param( + [ + {"map_index": 0}, + {"map_index": 1}, + {"map_index": 2}, + {"map_index": 3}, + {"map_index": 4}, + {"map_index": 5}, + {"map_index": 6}, + {"map_index": 7}, + ], + True, + ("/dags/~/dagRuns/~/taskInstances"), + {"map_index": [0, 1]}, + 2, + id="test map_index filter", + ), ], ) @pytest.mark.usefixtures("make_dag_with_multiple_versions")
