This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 452435879e9 Add task group ID filtering support to task instance query
(#58092) (#59511)
452435879e9 is described below
commit 452435879e90abf755e8ae06660d3e131860fe8b
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Dec 16 19:20:25 2025 +0100
Add task group ID filtering support to task instance query (#58092) (#59511)
* Add task group ID filtering support to task instance query
* Address the review comments
* Small Adjustment
---------
(cherry picked from commit 8256ee16d1de5b47ad996af82608d2bb836dfe1d)
Co-authored-by: Yiming Peng <[email protected]>
---
.../src/airflow/api_fastapi/common/parameters.py | 56 ++++++++++++++++++++++
.../core_api/openapi/v2-rest-api-generated.yaml | 12 +++++
.../core_api/routes/public/task_instances.py | 7 ++-
.../src/airflow/ui/openapi-gen/queries/common.ts | 5 +-
.../ui/openapi-gen/queries/ensureQueryData.ts | 6 ++-
.../src/airflow/ui/openapi-gen/queries/prefetch.ts | 6 ++-
.../src/airflow/ui/openapi-gen/queries/queries.ts | 6 ++-
.../src/airflow/ui/openapi-gen/queries/suspense.ts | 6 ++-
.../ui/openapi-gen/requests/services.gen.ts | 2 +
.../airflow/ui/openapi-gen/requests/types.gen.ts | 4 ++
.../TaskInstance/ClearGroupTaskInstanceDialog.tsx | 2 +-
.../ui/src/pages/Task/Overview/Overview.tsx | 4 +-
.../ui/src/pages/TaskInstances/TaskInstances.tsx | 3 +-
.../core_api/routes/public/test_task_instances.py | 42 +++++++++++++---
.../files/simple_auth_manager_passwords.json | 2 +-
15 files changed, 141 insertions(+), 22 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index 123b6d9841f..1d48187f1ce 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -66,6 +66,8 @@ from airflow.utils.types import DagRunType
if TYPE_CHECKING:
from sqlalchemy.sql import ColumnElement, Select
+ from airflow.serialization.serialized_objects import SerializedDAG
+
T = TypeVar("T")
@@ -181,6 +183,57 @@ class _SearchParam(BaseParam[str]):
raise NotImplementedError("Use search_param_factory instead , depends
is not implemented.")
+class QueryTaskInstanceTaskGroupFilter(BaseParam[str]):
+ """Task group filter - returns all tasks in the specified group."""
+
+ def __init__(self, dag=None, skip_none: bool = True):
+ super().__init__(skip_none=skip_none)
+ self._dag: None | SerializedDAG = dag
+
+ @property
+ def dag(self) -> None | SerializedDAG:
+ return self._dag
+
+ @dag.setter
+ def dag(self, value: None | SerializedDAG) -> None:
+ self._dag = value
+
+ def to_orm(self, select: Select) -> Select:
+ if self.value is None and self.skip_none:
+ return select
+
+ if not self.dag:
+ raise ValueError("Dag must be set before calling to_orm")
+
+ if not hasattr(self.dag, "task_group"):
+ return select
+
+ # Exact matching on group_id
+ task_groups = self.dag.task_group.get_task_group_dict()
+ task_group = task_groups.get(self.value)
+ if not task_group:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ detail={
+ "reason": "not_found",
+ "message": f"Task group {self.value} not found",
+ },
+ )
+
+ return select.where(TaskInstance.task_id.in_(task.task_id for task in
task_group.iter_tasks()))
+
+ @classmethod
+ def depends(
+ cls,
+ value: str | None = Query(
+ alias="task_group_id",
+ default=None,
+ description="Filter by exact task group ID. Returns all tasks
within the specified task group.",
+ ),
+ ) -> QueryTaskInstanceTaskGroupFilter:
+ return cls(dag=None).set_value(value)
+
+
def search_param_factory(
attribute: ColumnElement,
pattern_name: str,
@@ -862,6 +915,9 @@ QueryTIExecutorFilter = Annotated[
QueryTITaskDisplayNamePatternSearch = Annotated[
_SearchParam, Depends(search_param_factory(TaskInstance.task_display_name,
"task_display_name_pattern"))
]
+QueryTITaskGroupFilter = Annotated[
+ QueryTaskInstanceTaskGroupFilter,
Depends(QueryTaskInstanceTaskGroupFilter.depends)
+]
QueryTIDagVersionFilter = Annotated[
FilterParam[list[int]],
Depends(
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 5f7d5144e7e..4231c25b16b 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -6499,6 +6499,18 @@ paths:
title: Task Display Name Pattern
description: "SQL LIKE expression \u2014 use `%` / `_` wildcards (e.g.
`%customer_%`).\
\ Regular expressions are **not** supported."
+ - name: task_group_id
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ description: Filter by exact task group ID. Returns all tasks within
the
+ specified task group.
+ title: Task Group Id
+ description: Filter by exact task group ID. Returns all tasks within
the specified
+ task group.
- name: state
in: query
required: false
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 68764b5456a..79e59c009fa 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -49,6 +49,7 @@ from airflow.api_fastapi.common.parameters import (
QueryTIQueueFilter,
QueryTIStateFilter,
QueryTITaskDisplayNamePatternSearch,
+ QueryTITaskGroupFilter,
QueryTITryNumberFilter,
Range,
RangeFilter,
@@ -407,6 +408,7 @@ def get_task_instances(
update_at_range: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("updated_at", TI))],
duration_range: Annotated[RangeFilter,
Depends(float_range_filter_factory("duration", TI))],
task_display_name_pattern: QueryTITaskDisplayNamePatternSearch,
+ task_group_id: QueryTITaskGroupFilter,
state: QueryTIStateFilter,
pool: QueryTIPoolFilter,
queue: QueryTIQueueFilter,
@@ -468,8 +470,10 @@ def get_task_instances(
)
query = query.where(TI.run_id == dag_run_id)
if dag_id != "~":
- get_dag_for_run_or_latest_version(dag_bag, dag_run, dag_id, session)
+ dag = get_dag_for_run_or_latest_version(dag_bag, dag_run, dag_id,
session)
query = query.where(TI.dag_id == dag_id)
+ if dag:
+ task_group_id.dag = dag
task_instance_select, total_entries = paginated_select(
statement=query,
@@ -486,6 +490,7 @@ def get_task_instances(
executor,
task_id,
task_display_name_pattern,
+ task_group_id,
version_number,
readable_ti_filter,
try_number,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 994fe2d91c1..de50c91bd1c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -464,7 +464,7 @@ export const
UseTaskInstanceServiceGetMappedTaskInstanceKeyFn = ({ dagId, dagRun
export type TaskInstanceServiceGetTaskInstancesDefaultResponse =
Awaited<ReturnType<typeof TaskInstanceService.getTaskInstances>>;
export type TaskInstanceServiceGetTaskInstancesQueryResult<TData =
TaskInstanceServiceGetTaskInstancesDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetTaskInstancesKey =
"TaskInstanceServiceGetTaskInstances";
-export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagRunId,
durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte,
endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool,
queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt,
startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId,
tryNumber, updatedAtGt, updatedAtGte, updated [...]
+export const UseTaskInstanceServiceGetTaskInstancesKeyFn = ({ dagId, dagRunId,
durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte,
endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool,
queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt,
startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern,
taskGroupId, taskId, tryNumber, updatedAtGt, updatedA [...]
dagId: string;
dagRunId: string;
durationGt?: number;
@@ -497,6 +497,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn =
({ dagId, dagRunId, d
startDateLte?: string;
state?: string[];
taskDisplayNamePattern?: string;
+ taskGroupId?: string;
taskId?: string;
tryNumber?: number[];
updatedAtGt?: string;
@@ -504,7 +505,7 @@ export const UseTaskInstanceServiceGetTaskInstancesKeyFn =
({ dagId, dagRunId, d
updatedAtLt?: string;
updatedAtLte?: string;
versionNumber?: number[];
-}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetTaskInstancesKey,
...(queryKey ?? [{ dagId, dagRunId, durationGt, durationGte, durationLt,
durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset,
operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt,
runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state,
taskDisplayNamePattern, taskId, tryNumber, [...]
+}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetTaskInstancesKey,
...(queryKey ?? [{ dagId, dagRunId, durationGt, durationGte, durationLt,
durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset,
operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt,
runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state,
taskDisplayNamePattern, taskGroupId, taskId [...]
export type TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse =
Awaited<ReturnType<typeof TaskInstanceService.getTaskInstanceTryDetails>>;
export type TaskInstanceServiceGetTaskInstanceTryDetailsQueryResult<TData =
TaskInstanceServiceGetTaskInstanceTryDetailsDefaultResponse, TError = unknown>
= UseQueryResult<TData, TError>;
export const useTaskInstanceServiceGetTaskInstanceTryDetailsKey =
"TaskInstanceServiceGetTaskInstanceTryDetails";
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 41fe0005dcd..b3764204588 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -905,6 +905,7 @@ export const
ensureUseTaskInstanceServiceGetMappedTaskInstanceData = (queryClien
* @param data.durationLte
* @param data.durationLt
* @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_`
wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
+* @param data.taskGroupId Filter by exact task group ID. Returns all tasks
within the specified task group.
* @param data.state
* @param data.pool
* @param data.queue
@@ -919,7 +920,7 @@ export const
ensureUseTaskInstanceServiceGetMappedTaskInstanceData = (queryClien
* @returns TaskInstanceCollectionResponse Successful Response
* @throws ApiError
*/
-export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient:
QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt,
durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset,
operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt,
runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state,
taskDisplayNamePattern, taskId, tryNumber, upd [...]
+export const ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient:
QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt,
durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset,
operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt,
runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state,
taskDisplayNamePattern, taskGroupId, taskId, t [...]
dagId: string;
dagRunId: string;
durationGt?: number;
@@ -952,6 +953,7 @@ export const
ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: Qu
startDateLte?: string;
state?: string[];
taskDisplayNamePattern?: string;
+ taskGroupId?: string;
taskId?: string;
tryNumber?: number[];
updatedAtGt?: string;
@@ -959,7 +961,7 @@ export const
ensureUseTaskInstanceServiceGetTaskInstancesData = (queryClient: Qu
updatedAtLt?: string;
updatedAtLte?: string;
versionNumber?: number[];
-}) => queryClient.ensureQueryData({ queryKey:
Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId,
durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte,
endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool,
queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt,
startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId,
tryNumbe [...]
+}) => queryClient.ensureQueryData({ queryKey:
Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId,
durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte,
endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool,
queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt,
startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern,
taskGroupId, tas [...]
/**
* Get Task Instance Try Details
* Get task instance details by try number.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index fa6162ec588..fbb00335e4d 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -905,6 +905,7 @@ export const
prefetchUseTaskInstanceServiceGetMappedTaskInstance = (queryClient:
* @param data.durationLte
* @param data.durationLt
* @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_`
wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
+* @param data.taskGroupId Filter by exact task group ID. Returns all tasks
within the specified task group.
* @param data.state
* @param data.pool
* @param data.queue
@@ -919,7 +920,7 @@ export const
prefetchUseTaskInstanceServiceGetMappedTaskInstance = (queryClient:
* @returns TaskInstanceCollectionResponse Successful Response
* @throws ApiError
*/
-export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient:
QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt,
durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset,
operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt,
runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state,
taskDisplayNamePattern, taskId, tryNumber, updat [...]
+export const prefetchUseTaskInstanceServiceGetTaskInstances = (queryClient:
QueryClient, { dagId, dagRunId, durationGt, durationGte, durationLt,
durationLte, endDateGt, endDateGte, endDateLt, endDateLte, executor, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, mapIndex, offset,
operator, orderBy, pool, queue, runAfterGt, runAfterGte, runAfterLt,
runAfterLte, startDateGt, startDateGte, startDateLt, startDateLte, state,
taskDisplayNamePattern, taskGroupId, taskId, try [...]
dagId: string;
dagRunId: string;
durationGt?: number;
@@ -952,6 +953,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances
= (queryClient: Quer
startDateLte?: string;
state?: string[];
taskDisplayNamePattern?: string;
+ taskGroupId?: string;
taskId?: string;
tryNumber?: number[];
updatedAtGt?: string;
@@ -959,7 +961,7 @@ export const prefetchUseTaskInstanceServiceGetTaskInstances
= (queryClient: Quer
updatedAtLt?: string;
updatedAtLte?: string;
versionNumber?: number[];
-}) => queryClient.prefetchQuery({ queryKey:
Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId,
durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte,
endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool,
queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt,
startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern, taskId,
tryNumber, [...]
+}) => queryClient.prefetchQuery({ queryKey:
Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId,
durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte,
endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool,
queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt,
startDateGte, startDateLt, startDateLte, state, taskDisplayNamePattern,
taskGroupId, taskI [...]
/**
* Get Task Instance Try Details
* Get task instance details by try number.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 955e5049e60..cd9c9f0e99b 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -905,6 +905,7 @@ export const useTaskInstanceServiceGetMappedTaskInstance =
<TData = Common.TaskI
* @param data.durationLte
* @param data.durationLt
* @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_`
wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
+* @param data.taskGroupId Filter by exact task group ID. Returns all tasks
within the specified task group.
* @param data.state
* @param data.pool
* @param data.queue
@@ -919,7 +920,7 @@ export const useTaskInstanceServiceGetMappedTaskInstance =
<TData = Common.TaskI
* @returns TaskInstanceCollectionResponse Successful Response
* @throws ApiError
*/
-export const useTaskInstanceServiceGetTaskInstances = <TData =
Common.TaskInstanceServiceGetTaskInstancesDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt,
durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt,
endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt,
logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt,
runAfterGte, runAfterLt, runAfterLte, startDateGt, [...]
+export const useTaskInstanceServiceGetTaskInstances = <TData =
Common.TaskInstanceServiceGetTaskInstancesDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt,
durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt,
endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt,
logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt,
runAfterGte, runAfterLt, runAfterLte, startDateGt, [...]
dagId: string;
dagRunId: string;
durationGt?: number;
@@ -952,6 +953,7 @@ export const useTaskInstanceServiceGetTaskInstances =
<TData = Common.TaskInstan
startDateLte?: string;
state?: string[];
taskDisplayNamePattern?: string;
+ taskGroupId?: string;
taskId?: string;
tryNumber?: number[];
updatedAtGt?: string;
@@ -959,7 +961,7 @@ export const useTaskInstanceServiceGetTaskInstances =
<TData = Common.TaskInstan
updatedAtLt?: string;
updatedAtLte?: string;
versionNumber?: number[];
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId,
durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte,
endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool,
queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt [...]
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId,
durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte,
endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool,
queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, startDateGt [...]
/**
* Get Task Instance Try Details
* Get task instance details by try number.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index aafe12ed9bc..460cad0b049 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -905,6 +905,7 @@ export const
useTaskInstanceServiceGetMappedTaskInstanceSuspense = <TData = Comm
* @param data.durationLte
* @param data.durationLt
* @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_`
wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
+* @param data.taskGroupId Filter by exact task group ID. Returns all tasks
within the specified task group.
* @param data.state
* @param data.pool
* @param data.queue
@@ -919,7 +920,7 @@ export const
useTaskInstanceServiceGetMappedTaskInstanceSuspense = <TData = Comm
* @returns TaskInstanceCollectionResponse Successful Response
* @throws ApiError
*/
-export const useTaskInstanceServiceGetTaskInstancesSuspense = <TData =
Common.TaskInstanceServiceGetTaskInstancesDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt,
durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt,
endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt,
logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt,
runAfterGte, runAfterLt, runAfterLte, star [...]
+export const useTaskInstanceServiceGetTaskInstancesSuspense = <TData =
Common.TaskInstanceServiceGetTaskInstancesDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, durationGt,
durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt,
endDateLte, executor, limit, logicalDateGt, logicalDateGte, logicalDateLt,
logicalDateLte, mapIndex, offset, operator, orderBy, pool, queue, runAfterGt,
runAfterGte, runAfterLt, runAfterLte, star [...]
dagId: string;
dagRunId: string;
durationGt?: number;
@@ -952,6 +953,7 @@ export const useTaskInstanceServiceGetTaskInstancesSuspense
= <TData = Common.Ta
startDateLte?: string;
state?: string[];
taskDisplayNamePattern?: string;
+ taskGroupId?: string;
taskId?: string;
tryNumber?: number[];
updatedAtGt?: string;
@@ -959,7 +961,7 @@ export const useTaskInstanceServiceGetTaskInstancesSuspense
= <TData = Common.Ta
updatedAtLt?: string;
updatedAtLte?: string;
versionNumber?: number[];
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId,
durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte,
endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool,
queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, sta [...]
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseTaskInstanceServiceGetTaskInstancesKeyFn({ dagId, dagRunId,
durationGt, durationGte, durationLt, durationLte, endDateGt, endDateGte,
endDateLt, endDateLte, executor, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, mapIndex, offset, operator, orderBy, pool,
queue, runAfterGt, runAfterGte, runAfterLt, runAfterLte, sta [...]
/**
* Get Task Instance Try Details
* Get task instance details by try number.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 91ded2463e7..153da21a141 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -2315,6 +2315,7 @@ export class TaskInstanceService {
* @param data.durationLte
* @param data.durationLt
* @param data.taskDisplayNamePattern SQL LIKE expression — use `%` / `_`
wildcards (e.g. `%customer_%`). Regular expressions are **not** supported.
+ * @param data.taskGroupId Filter by exact task group ID. Returns all
tasks within the specified task group.
* @param data.state
* @param data.pool
* @param data.queue
@@ -2364,6 +2365,7 @@ export class TaskInstanceService {
duration_lte: data.durationLte,
duration_lt: data.durationLt,
task_display_name_pattern: data.taskDisplayNamePattern,
+ task_group_id: data.taskGroupId,
state: data.state,
pool: data.pool,
queue: data.queue,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 1a1bf0f50e9..eaf8190a369 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -2775,6 +2775,10 @@ export type GetTaskInstancesData = {
* SQL LIKE expression — use `%` / `_` wildcards (e.g. `%customer_%`).
Regular expressions are **not** supported.
*/
taskDisplayNamePattern?: string | null;
+ /**
+ * Filter by exact task group ID. Returns all tasks within the specified
task group.
+ */
+ taskGroupId?: string | null;
taskId?: string | null;
tryNumber?: Array<(number)>;
updatedAtGt?: string | null;
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
index b365a181612..7821c3598b6 100644
---
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
@@ -66,7 +66,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open,
taskInstance }: Pr
{
dagId,
dagRunId: runId,
- taskDisplayNamePattern: groupId,
+ taskGroupId: groupId,
},
undefined,
{
diff --git a/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx
b/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx
index d003174b987..8940efad08d 100644
--- a/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/Task/Overview/Overview.tsx
@@ -50,7 +50,7 @@ export const Overview = () => {
runAfterGte: startDate,
runAfterLte: endDate,
state: ["failed"],
- taskDisplayNamePattern: groupId ?? undefined,
+ taskGroupId: groupId ?? undefined,
taskId: Boolean(groupId) ? undefined : taskId,
});
@@ -60,7 +60,7 @@ export const Overview = () => {
dagRunId: "~",
limit: 14,
orderBy: ["-run_after"],
- taskDisplayNamePattern: groupId ?? undefined,
+ taskGroupId: groupId ?? undefined,
taskId: Boolean(groupId) ? undefined : taskId,
},
undefined,
diff --git
a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
index c51430b9935..911a03debe4 100644
--- a/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/TaskInstances/TaskInstances.tsx
@@ -251,7 +251,8 @@ export const TaskInstances = () => {
pool: hasFilteredPool ? pool : undefined,
startDateGte: startDate ?? undefined,
state: hasFilteredState ? filteredState : undefined,
- taskDisplayNamePattern: groupId ?? taskDisplayNamePattern ?? undefined,
+ taskDisplayNamePattern: taskDisplayNamePattern ?? undefined,
+ taskGroupId: groupId ?? undefined,
taskId: Boolean(groupId) ? undefined : taskId,
},
undefined,
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index a353b110791..ee5e9724784 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -1191,6 +1191,24 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
3,
id="test task_display_name_pattern filter",
),
+ pytest.param(
+ "task_group_test",
+ True,
+
("/dags/example_task_group/dagRuns/TEST_DAG_RUN_ID/taskInstances"),
+ {"task_group_id": "section_1"},
+ 3,
+ 7,
+ id="test task_group filter with exact match",
+ ),
+ pytest.param(
+ "task_group_test",
+ True,
+
("/dags/example_task_group/dagRuns/TEST_DAG_RUN_ID/taskInstances"),
+ {"task_group_id": "section_2"},
+ 4, # section_2 has 4 tasks: task_1 + inner_section_2 (task_2,
task_3, task_4)
+ 7,
+ id="test task_group filter exact match on group_id",
+ ),
pytest.param(
[
{"task_id": "task_match_id_1"},
@@ -1294,12 +1312,24 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
expected_queries_number,
session,
):
- self.create_task_instances(
- session,
- update_extras=update_extras,
- task_instances=task_instances,
- )
- with mock.patch("airflow.models.dag_version.DagBundlesManager"):
+ # Special handling for dag_id_pattern tests that require multiple DAGs
+ if task_instances == "dag_id_pattern_test":
+ # Create task instances for multiple DAGs like the original
test_dag_id_pattern_filter
+ dag1_id = "example_python_operator"
+ dag2_id = "example_skip_dag"
+ self.create_task_instances(session, dag_id=dag1_id)
+ self.create_task_instances(session, dag_id=dag2_id)
+ elif task_instances == "task_group_test":
+ # test with task group expansion
+ self.create_task_instances(session, dag_id="example_task_group")
+ else:
+ self.create_task_instances(
+ session,
+ update_extras=update_extras,
+ task_instances=task_instances,
+ )
+ with mock.patch("airflow.models.dag_version.DagBundlesManager") as
dag_bundle_manager_mock:
+ dag_bundle_manager_mock.return_value.view_url.return_value =
"some_url"
# Mock DagBundlesManager to avoid checking if dags-folder bundle
is configured
with assert_queries_count(expected_queries_number):
response = test_client.get(url, params=params)
diff --git
a/dev/breeze/src/airflow_breeze/files/simple_auth_manager_passwords.json
b/dev/breeze/src/airflow_breeze/files/simple_auth_manager_passwords.json
index 1f74a7b6616..b02339b749a 100644
--- a/dev/breeze/src/airflow_breeze/files/simple_auth_manager_passwords.json
+++ b/dev/breeze/src/airflow_breeze/files/simple_auth_manager_passwords.json
@@ -1 +1 @@
-{"admin": "admin", "viewer": "viewer", "user": "user", "op": "op"}
+{"admin": "admin"}