This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c78dc3e2224 Add dag runs filters (bundleVersion) (#62810)
c78dc3e2224 is described below
commit c78dc3e22240c2836dbdda22aeb2307afada10db
Author: fat-catTW <[email protected]>
AuthorDate: Wed Mar 11 22:05:45 2026 +0800
Add dag runs filters (bundleVersion) (#62810)
* feat(i18n): Complete zh-TW translations - Add 10 missing keys
- Add pendingDagRun (singular and plural) translations
- Add partitionedDagRun (singular and plural) translations
- Add partitionedDagRunDetail.receivedAssetEvents translation
- Add panel.showVersionIndicator label and options translations
- Achieves 100% zh-TW translation coverage (786/786 keys)
* feat(i18n): Change partition -> 分割 to partition -> 分區 to zh-TW
translations and add partition -> 區分 to zh-TW.md
* Remove unrelated files from PR
* Store bundle_version during DAG collection and add tests for dag runs
filtering
* Remove unrelated translation changes from PR
---
.../core_api/openapi/v2-rest-api-generated.yaml | 8 ++++++++
.../api_fastapi/core_api/routes/public/dag_run.py | 4 ++++
airflow-core/src/airflow/dag_processing/collection.py | 8 +++++++-
.../src/airflow/ui/openapi-gen/queries/common.ts | 5 +++--
.../airflow/ui/openapi-gen/queries/ensureQueryData.ts | 6 ++++--
.../src/airflow/ui/openapi-gen/queries/prefetch.ts | 6 ++++--
.../src/airflow/ui/openapi-gen/queries/queries.ts | 6 ++++--
.../src/airflow/ui/openapi-gen/queries/suspense.ts | 6 ++++--
.../src/airflow/ui/openapi-gen/requests/services.gen.ts | 2 ++
.../src/airflow/ui/openapi-gen/requests/types.gen.ts | 1 +
.../src/airflow/ui/src/constants/filterConfigs.tsx | 6 ++++++
.../src/airflow/ui/src/constants/searchParams.ts | 1 +
airflow-core/src/airflow/ui/src/pages/DagRuns.tsx | 3 +++
.../src/airflow/ui/src/pages/DagRunsFilters.tsx | 1 +
.../src/airflow/ui/src/utils/useFiltersHandler.ts | 1 +
.../api_fastapi/core_api/routes/public/test_dag_run.py | 17 +++++++++++++++++
16 files changed, 70 insertions(+), 11 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index d3955872f71..0bf32749927 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -2285,6 +2285,14 @@ paths:
items:
type: integer
title: Dag Version
+ - name: bundle_version
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ - type: 'null'
+ title: Bundle Version
- name: order_by
in: query
required: false
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index d0cc597fd51..359809458cf 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -346,6 +346,9 @@ def get_dag_runs(
run_type: QueryDagRunRunTypesFilter,
state: QueryDagRunStateFilter,
dag_version: QueryDagRunVersionFilter,
+ bundle_version: Annotated[
+ FilterParam[str | None],
Depends(filter_param_factory(DagRun.bundle_version, str | None))
+ ],
order_by: Annotated[
SortParam,
Depends(
@@ -407,6 +410,7 @@ def get_dag_runs(
state,
run_type,
dag_version,
+ bundle_version,
readable_dag_runs_filter,
run_id_pattern,
triggering_user_name_pattern,
diff --git a/airflow-core/src/airflow/dag_processing/collection.py
b/airflow-core/src/airflow/dag_processing/collection.py
index 5ea7d833151..7754080ae80 100644
--- a/airflow-core/src/airflow/dag_processing/collection.py
+++ b/airflow-core/src/airflow/dag_processing/collection.py
@@ -84,12 +84,17 @@ log = structlog.get_logger(__name__)
def _create_orm_dags(
bundle_name: str,
+ bundle_version: str | None,
dags: Iterable[LazyDeserializedDAG],
*,
session: Session,
) -> Iterator[DagModel]:
for dag in dags:
- orm_dag = DagModel(dag_id=dag.dag_id, bundle_name=bundle_name)
+ orm_dag = DagModel(
+ dag_id=dag.dag_id,
+ bundle_name=bundle_name,
+ bundle_version=bundle_version,
+ )
if dag.is_paused_upon_creation is not None:
orm_dag.is_paused = dag.is_paused_upon_creation
log.info("Creating ORM DAG for %s", dag.dag_id)
@@ -529,6 +534,7 @@ class DagModelOperation(NamedTuple):
(model.dag_id, model)
for model in _create_orm_dags(
bundle_name=self.bundle_name,
+ bundle_version=self.bundle_version,
dags=(dag for dag_id, dag in self.dags.items() if dag_id not
in orm_dags),
session=session,
)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 3e09d977b29..d74622f5a8c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -143,7 +143,8 @@ export const UseDagRunServiceGetUpstreamAssetEventsKeyFn =
({ dagId, dagRunId }:
export type DagRunServiceGetDagRunsDefaultResponse = Awaited<ReturnType<typeof
DagRunService.getDagRuns>>;
export type DagRunServiceGetDagRunsQueryResult<TData =
DagRunServiceGetDagRunsDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useDagRunServiceGetDagRunsKey = "DagRunServiceGetDagRuns";
-export const UseDagRunServiceGetDagRunsKeyFn = ({ confContains, dagId,
dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte,
endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt,
logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte,
state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, upda [...]
+export const UseDagRunServiceGetDagRunsKeyFn = ({ bundleVersion, confContains,
dagId, dagIdPattern, dagVersion, durationGt, durationGte, durationLt,
durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte,
state, triggeringUserNamePattern, updatedAtGt, upd [...]
+ bundleVersion?: string;
confContains?: string;
dagId: string;
dagIdPattern?: string;
@@ -180,7 +181,7 @@ export const UseDagRunServiceGetDagRunsKeyFn = ({
confContains, dagId, dagIdPatt
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
-}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey
?? [{ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte,
durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte,
state, triggeringUserNamePatter [...]
+}, queryKey?: Array<unknown>) => [useDagRunServiceGetDagRunsKey, ...(queryKey
?? [{ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt,
durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt,
endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt,
logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte,
runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte,
startDateLt, startDateLte, state, triggerin [...]
export type DagRunServiceWaitDagRunUntilFinishedDefaultResponse =
Awaited<ReturnType<typeof DagRunService.waitDagRunUntilFinished>>;
export type DagRunServiceWaitDagRunUntilFinishedQueryResult<TData =
DagRunServiceWaitDagRunUntilFinishedDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useDagRunServiceWaitDagRunUntilFinishedKey =
"DagRunServiceWaitDagRunUntilFinished";
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 2b61c294b2e..8ccaf76a47a 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -293,6 +293,7 @@ export const
ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu
* @param data.runType
* @param data.state
* @param data.dagVersion
+* @param data.bundleVersion
* @param data.orderBy Attributes to order by, multi criteria sort is
supported. Prefix with `-` for descending order. Supported attributes: `id,
state, dag_id, run_id, logical_date, run_after, start_date, end_date,
updated_at, conf, duration, dag_run_id`
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g.
`%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`).
Regular expressions are **not** supported.
* @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_`
wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g.
`dag1 | dag2`). Regular expressions are **not** supported.
@@ -301,7 +302,8 @@ export const
ensureUseDagRunServiceGetUpstreamAssetEventsData = (queryClient: Qu
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
-export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient,
{ confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte,
durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte,
state, triggeringUserNamePattern, [...]
+export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient,
{ bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt,
durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt,
endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt,
logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte,
runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte,
startDateLt, startDateLte, state, triggeringUs [...]
+ bundleVersion?: string;
confContains?: string;
dagId: string;
dagIdPattern?: string;
@@ -338,7 +340,7 @@ export const ensureUseDagRunServiceGetDagRunsData =
(queryClient: QueryClient, {
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
-}) => queryClient.ensureQueryData({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern,
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt,
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern,
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType,
startDateGt, startDateGte, startDateLt, startDateLte, state,
triggeringUserNamePat [...]
+}) => queryClient.ensureQueryData({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, dagId,
dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte,
endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt,
logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte,
state, trigge [...]
/**
* Experimental: Wait for a dag run to complete, and return task results if
requested.
* 🚧 This is an experimental endpoint and may change or be removed without
notice.Successful response are streamed as newline-delimited JSON (NDJSON).
Each line is a JSON object representing the DAG run state.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index 43f9ad49c88..57c4174683d 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -293,6 +293,7 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents
= (queryClient: Quer
* @param data.runType
* @param data.state
* @param data.dagVersion
+* @param data.bundleVersion
* @param data.orderBy Attributes to order by, multi criteria sort is
supported. Prefix with `-` for descending order. Supported attributes: `id,
state, dag_id, run_id, logical_date, run_after, start_date, end_date,
updated_at, conf, duration, dag_run_id`
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g.
`%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`).
Regular expressions are **not** supported.
* @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_`
wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g.
`dag1 | dag2`). Regular expressions are **not** supported.
@@ -301,7 +302,8 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents
= (queryClient: Quer
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
-export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, {
confContains, dagId, dagIdPattern, dagVersion, durationGt, durationGte,
durationLt, durationLte, endDateGt, endDateGte, endDateLt, endDateLte, limit,
logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte,
state, triggeringUserNamePattern, up [...]
+export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, {
bundleVersion, confContains, dagId, dagIdPattern, dagVersion, durationGt,
durationGte, durationLt, durationLte, endDateGt, endDateGte, endDateLt,
endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt,
logicalDateLte, offset, orderBy, partitionKeyPattern, runAfterGt, runAfterGte,
runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte,
startDateLt, startDateLte, state, triggeringUser [...]
+ bundleVersion?: string;
confContains?: string;
dagId: string;
dagIdPattern?: string;
@@ -338,7 +340,7 @@ export const prefetchUseDagRunServiceGetDagRuns =
(queryClient: QueryClient, { c
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
-}) => queryClient.prefetchQuery({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern,
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt,
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern,
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType,
startDateGt, startDateGte, startDateLt, startDateLte, state,
triggeringUserNamePatte [...]
+}) => queryClient.prefetchQuery({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, dagId,
dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte,
endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt,
logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte,
state, triggeri [...]
/**
* Experimental: Wait for a dag run to complete, and return task results if
requested.
* 🚧 This is an experimental endpoint and may change or be removed without
notice.Successful response are streamed as newline-delimited JSON (NDJSON).
Each line is a JSON object representing the DAG run state.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 99a22c6f323..33a5ca38816 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -293,6 +293,7 @@ export const useDagRunServiceGetUpstreamAssetEvents =
<TData = Common.DagRunServ
* @param data.runType
* @param data.state
* @param data.dagVersion
+* @param data.bundleVersion
* @param data.orderBy Attributes to order by, multi criteria sort is
supported. Prefix with `-` for descending order. Supported attributes: `id,
state, dag_id, run_id, logical_date, run_after, start_date, end_date,
updated_at, conf, duration, dag_run_id`
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g.
`%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`).
Regular expressions are **not** supported.
* @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_`
wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g.
`dag1 | dag2`). Regular expressions are **not** supported.
@@ -301,7 +302,8 @@ export const useDagRunServiceGetUpstreamAssetEvents =
<TData = Common.DagRunServ
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
-export const useDagRunServiceGetDagRuns = <TData =
Common.DagRunServiceGetDagRunsDefaultResponse, TError = unknown, TQueryKey
extends Array<unknown> = unknown[]>({ confContains, dagId, dagIdPattern,
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt,
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern,
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, start
[...]
+export const useDagRunServiceGetDagRuns = <TData =
Common.DagRunServiceGetDagRunsDefaultResponse, TError = unknown, TQueryKey
extends Array<unknown> = unknown[]>({ bundleVersion, confContains, dagId,
dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte,
endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt,
logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte,
runIdPattern, [...]
+ bundleVersion?: string;
confContains?: string;
dagId: string;
dagIdPattern?: string;
@@ -338,7 +340,7 @@ export const useDagRunServiceGetDagRuns = <TData =
Common.DagRunServiceGetDagRun
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern,
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt,
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern,
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, r [...]
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, dagId,
dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte,
endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt,
logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, [...]
/**
* Experimental: Wait for a dag run to complete, and return task results if
requested.
* 🚧 This is an experimental endpoint and may change or be removed without
notice.Successful response are streamed as newline-delimited JSON (NDJSON).
Each line is a JSON object representing the DAG run state.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index 87d9580e031..2d01abdb58f 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -293,6 +293,7 @@ export const useDagRunServiceGetUpstreamAssetEventsSuspense
= <TData = Common.Da
* @param data.runType
* @param data.state
* @param data.dagVersion
+* @param data.bundleVersion
* @param data.orderBy Attributes to order by, multi criteria sort is
supported. Prefix with `-` for descending order. Supported attributes: `id,
state, dag_id, run_id, logical_date, run_after, start_date, end_date,
updated_at, conf, duration, dag_run_id`
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards (e.g.
`%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 | dag2`).
Regular expressions are **not** supported.
* @param data.triggeringUserNamePattern SQL LIKE expression — use `%` / `_`
wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g.
`dag1 | dag2`). Regular expressions are **not** supported.
@@ -301,7 +302,8 @@ export const useDagRunServiceGetUpstreamAssetEventsSuspense
= <TData = Common.Da
* @returns DAGRunCollectionResponse Successful Response
* @throws ApiError
*/
-export const useDagRunServiceGetDagRunsSuspense = <TData =
Common.DagRunServiceGetDagRunsDefaultResponse, TError = unknown, TQueryKey
extends Array<unknown> = unknown[]>({ confContains, dagId, dagIdPattern,
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt,
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern,
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runTyp [...]
+export const useDagRunServiceGetDagRunsSuspense = <TData =
Common.DagRunServiceGetDagRunsDefaultResponse, TError = unknown, TQueryKey
extends Array<unknown> = unknown[]>({ bundleVersion, confContains, dagId,
dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte,
endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt,
logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runId
[...]
+ bundleVersion?: string;
confContains?: string;
dagId: string;
dagIdPattern?: string;
@@ -338,7 +340,7 @@ export const useDagRunServiceGetDagRunsSuspense = <TData =
Common.DagRunServiceG
updatedAtGte?: string;
updatedAtLt?: string;
updatedAtLte?: string;
-}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ confContains, dagId, dagIdPattern,
dagVersion, durationGt, durationGte, durationLt, durationLte, endDateGt,
endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte,
logicalDateLt, logicalDateLte, offset, orderBy, partitionKeyPattern,
runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPa [...]
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseDagRunServiceGetDagRunsKeyFn({ bundleVersion, confContains, dagId,
dagIdPattern, dagVersion, durationGt, durationGte, durationLt, durationLte,
endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt,
logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy,
partitionKeyPattern, runAfterGt, runAfterGte, runAfterLt, runAf [...]
/**
* Experimental: Wait for a dag run to complete, and return task results if
requested.
* 🚧 This is an experimental endpoint and may change or be removed without
notice.Successful response are streamed as newline-delimited JSON (NDJSON).
Each line is a JSON object representing the DAG run state.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 976a14533cd..d36012e372c 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -1004,6 +1004,7 @@ export class DagRunService {
* @param data.runType
* @param data.state
* @param data.dagVersion
+ * @param data.bundleVersion
* @param data.orderBy Attributes to order by, multi criteria sort is
supported. Prefix with `-` for descending order. Supported attributes: `id,
state, dag_id, run_id, logical_date, run_after, start_date, end_date,
updated_at, conf, duration, dag_run_id`
* @param data.runIdPattern SQL LIKE expression — use `%` / `_` wildcards
(e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g. `dag1 |
dag2`). Regular expressions are **not** supported.
* @param data.triggeringUserNamePattern SQL LIKE expression — use `%` /
`_` wildcards (e.g. `%customer_%`). or the pipe `|` operator for OR logic (e.g.
`dag1 | dag2`). Regular expressions are **not** supported.
@@ -1050,6 +1051,7 @@ export class DagRunService {
run_type: data.runType,
state: data.state,
dag_version: data.dagVersion,
+ bundle_version: data.bundleVersion,
order_by: data.orderBy,
run_id_pattern: data.runIdPattern,
triggering_user_name_pattern: data.triggeringUserNamePattern,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 6794c8ff4fa..038c02ee2cd 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -2535,6 +2535,7 @@ export type ClearDagRunData = {
export type ClearDagRunResponse = TaskInstanceCollectionResponse |
DAGRunResponse;
export type GetDagRunsData = {
+ bundleVersion?: string | null;
confContains?: string;
dagId: string;
/**
diff --git a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx
b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx
index 698d485f93c..691ed35c042 100644
--- a/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx
+++ b/airflow-core/src/airflow/ui/src/constants/filterConfigs.tsx
@@ -77,6 +77,12 @@ export const useFilterConfigs = () => {
label: translate("hitl:filters.body"),
type: FilterTypes.TEXT,
},
+ [SearchParamsKeys.BUNDLE_VERSION]: {
+ hotkeyDisabled: true,
+ icon: <MdCode />,
+ label: translate("common:bundleVersion"),
+ type: FilterTypes.TEXT,
+ },
[SearchParamsKeys.CONF_CONTAINS]: {
hotkeyDisabled: true,
icon: <MdCode />,
diff --git a/airflow-core/src/airflow/ui/src/constants/searchParams.ts
b/airflow-core/src/airflow/ui/src/constants/searchParams.ts
index 0215a55b2da..d37001e7688 100644
--- a/airflow-core/src/airflow/ui/src/constants/searchParams.ts
+++ b/airflow-core/src/airflow/ui/src/constants/searchParams.ts
@@ -21,6 +21,7 @@ export enum SearchParamsKeys {
ASSET_EVENT_DATE_RANGE = "asset_event_date_range",
BEFORE = "before",
BODY_SEARCH = "body_search",
+ BUNDLE_VERSION = "bundle_version",
CONF_CONTAINS = "conf_contains",
CREATED_AT_GTE = "created_at_gte",
CREATED_AT_LTE = "created_at_lte",
diff --git a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
index a05a756c0a4..94b33640312 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagRuns.tsx
@@ -45,6 +45,7 @@ import { renderDuration, useAutoRefresh, isStatePending }
from "src/utils";
type DagRunRow = { row: { original: DAGRunResponse } };
const {
+ BUNDLE_VERSION: BUNDLE_VERSION_PARAM,
CONF_CONTAINS: CONF_CONTAINS_PARAM,
DAG_ID_PATTERN: DAG_ID_PATTERN_PARAM,
DAG_VERSION: DAG_VERSION_PARAM,
@@ -213,6 +214,7 @@ export const DagRuns = () => {
const filteredTriggeringUserNamePattern =
searchParams.get(TRIGGERING_USER_NAME_PATTERN_PARAM);
const filteredDagIdPattern = searchParams.get(DAG_ID_PATTERN_PARAM);
const filteredDagVersion = searchParams.get(DAG_VERSION_PARAM);
+ const bundleVersion = searchParams.get(BUNDLE_VERSION_PARAM);
const startDateGte = searchParams.get(START_DATE_GTE_PARAM);
const startDateLte = searchParams.get(START_DATE_LTE_PARAM);
const endDateGte = searchParams.get(END_DATE_GTE_PARAM);
@@ -230,6 +232,7 @@ export const DagRuns = () => {
const { data, error, isLoading } = useDagRunServiceGetDagRuns(
{
+ bundleVersion: bundleVersion ?? undefined,
confContains: confContains !== null && confContains !== "" ?
confContains : undefined,
dagId: dagId ?? "~",
dagIdPattern: filteredDagIdPattern ?? undefined,
diff --git a/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx
b/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx
index 201b77d0814..b0f90d66f98 100644
--- a/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx
+++ b/airflow-core/src/airflow/ui/src/pages/DagRunsFilters.tsx
@@ -41,6 +41,7 @@ export const DagRunsFilters = ({ dagId }:
DagRunsFiltersProps) => {
SearchParamsKeys.TRIGGERING_USER_NAME_PATTERN,
SearchParamsKeys.DAG_VERSION,
SearchParamsKeys.PARTITION_KEY_PATTERN,
+ SearchParamsKeys.BUNDLE_VERSION,
];
if (dagId === undefined) {
diff --git a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts
b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts
index 2e879ee0c8e..17ab2fc3506 100644
--- a/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts
+++ b/airflow-core/src/airflow/ui/src/utils/useFiltersHandler.ts
@@ -58,6 +58,7 @@ const handleDateRangeChange = (
export type FilterableSearchParamsKeys =
| SearchParamsKeys.ASSET_EVENT_DATE_RANGE
| SearchParamsKeys.BODY_SEARCH
+ | SearchParamsKeys.BUNDLE_VERSION
| SearchParamsKeys.CONF_CONTAINS
| SearchParamsKeys.CREATED_AT_RANGE
| SearchParamsKeys.DAG_DISPLAY_NAME_PATTERN
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index e00d075f2a7..3a55532e3d7 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -793,6 +793,23 @@ class TestGetDagRuns:
body = response.json()
assert body["detail"] == expected_detail
+ @pytest.mark.usefixtures("make_dag_with_multiple_versions")
+ @pytest.mark.parametrize(
+ ("dag_id", "query_params", "expected_dag_run_ids"),
+ [
+ ("dag_with_multiple_versions", {"bundle_version":
"some_commit_hash1"}, ["run1"]),
+ ("dag_with_multiple_versions", {"bundle_version":
"some_commit_hash2"}, ["run2"]),
+ ("dag_with_multiple_versions", {"bundle_version":
"some_commit_hash3"}, ["run3"]),
+ ("~", {"bundle_version": "some_commit_hash2"}, ["run2"]),
+ ("~", {"bundle_version": "does_not_exist"}, []),
+ ],
+ )
+ def test_filter_by_bundle_version(self, test_client, dag_id, query_params,
expected_dag_run_ids):
+ response = test_client.get(f"/dags/{dag_id}/dagRuns",
params=query_params)
+ assert response.status_code == 200
+ body = response.json()
+ assert [each["dag_run_id"] for each in body["dag_runs"]] ==
expected_dag_run_ids
+
def test_invalid_state(self, test_client):
response = test_client.get(f"/dags/{DAG1_ID}/dagRuns",
params={"state": ["invalid"]})
assert response.status_code == 422