This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8f63b828ace Add run after to dag runs api (#46739)
8f63b828ace is described below
commit 8f63b828ace09b1095229f22b0c6d1f0f85ea81a
Author: Brent Bovenzi <[email protected]>
AuthorDate: Fri Feb 14 11:08:56 2025 -0500
Add run after to dag runs api (#46739)
* Fix some UI usage of logical_date vs run_after
* Add run_after as sort param for dag runs endpoints
* Swap gte/lte search for failed dag run button
* Replace another missed logical_date sort
* Update tests
---
airflow/api_fastapi/core_api/datamodels/dag_run.py | 2 ++
.../api_fastapi/core_api/openapi/v1-generated.yaml | 30 ++++++++++++++++++++++
.../api_fastapi/core_api/routes/public/dag_run.py | 11 ++++++--
airflow/api_fastapi/core_api/routes/ui/dags.py | 16 ++++++------
airflow/ui/openapi-gen/queries/common.ts | 6 +++++
airflow/ui/openapi-gen/queries/prefetch.ts | 10 ++++++++
airflow/ui/openapi-gen/queries/queries.ts | 10 ++++++++
airflow/ui/openapi-gen/queries/suspense.ts | 10 ++++++++
airflow/ui/openapi-gen/requests/schemas.gen.ts | 24 +++++++++++++++++
airflow/ui/openapi-gen/requests/services.gen.ts | 4 +++
airflow/ui/openapi-gen/requests/types.gen.ts | 4 +++
airflow/ui/src/pages/Dag/Overview/Overview.tsx | 6 ++---
airflow/ui/src/pages/DagRuns.tsx | 2 +-
.../core_api/routes/public/test_dag_run.py | 27 +++++++++++++++++++
tests/api_fastapi/core_api/routes/ui/test_dags.py | 11 ++++----
15 files changed, 154 insertions(+), 19 deletions(-)
diff --git a/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 77a5bedcde1..77727406bce 100644
--- a/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -116,6 +116,8 @@ class DAGRunsBatchBody(StrictBaseModel):
page_limit: NonNegativeInt = 100
dag_ids: list[str] | None = None
states: list[DagRunState | None] | None = None
+ run_after_gte: AwareDatetime | None = None
+ run_after_lte: AwareDatetime | None = None
logical_date_gte: AwareDatetime | None = None
logical_date_lte: AwareDatetime | None = None
start_date_gte: AwareDatetime | None = None
diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
index 4b38e45fa76..3a27539e261 100644
--- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
+++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml
@@ -2230,6 +2230,24 @@ paths:
minimum: 0
default: 0
title: Offset
+ - name: run_after_gte
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Run After Gte
+ - name: run_after_lte
+ in: query
+ required: false
+ schema:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Run After Lte
- name: logical_date_gte
in: query
required: false
@@ -8418,6 +8436,18 @@ components:
type: array
- type: 'null'
title: States
+ run_after_gte:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Run After Gte
+ run_after_lte:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Run After Lte
logical_date_gte:
anyOf:
- type: string
diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow/api_fastapi/core_api/routes/public/dag_run.py
index c1ce04c31f9..fc4397447da 100644
--- a/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -269,6 +269,7 @@ def get_dag_runs(
dag_id: str,
limit: QueryLimit,
offset: QueryOffset,
+ run_after: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("run_after", DagRun))],
logical_date: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("logical_date", DagRun))],
start_date_range: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("start_date", DagRun))],
end_date_range: Annotated[RangeFilter,
Depends(datetime_range_filter_factory("end_date", DagRun))],
@@ -284,6 +285,7 @@ def get_dag_runs(
"dag_id",
"run_id",
"logical_date",
+ "run_after",
"start_date",
"end_date",
"updated_at",
@@ -314,7 +316,7 @@ def get_dag_runs(
dag_run_select, total_entries = paginated_select(
statement=query,
- filters=[logical_date, start_date_range, end_date_range,
update_at_range, state],
+ filters=[run_after, logical_date, start_date_range, end_date_range,
update_at_range, state],
order_by=order_by,
offset=offset,
limit=limit,
@@ -416,6 +418,10 @@ def get_list_dag_runs_batch(
Range(lower_bound=body.logical_date_gte,
upper_bound=body.logical_date_lte),
attribute=DagRun.logical_date,
)
+ run_after = RangeFilter(
+ Range(lower_bound=body.run_after_gte, upper_bound=body.run_after_lte),
+ attribute=DagRun.run_after,
+ )
start_date = RangeFilter(
Range(lower_bound=body.start_date_gte,
upper_bound=body.start_date_lte),
attribute=DagRun.start_date,
@@ -434,6 +440,7 @@ def get_list_dag_runs_batch(
"id",
"state",
"dag_id",
+ "run_after",
"logical_date",
"run_id",
"start_date",
@@ -449,7 +456,7 @@ def get_list_dag_runs_batch(
base_query = select(DagRun)
dag_runs_select, total_entries = paginated_select(
statement=base_query,
- filters=[dag_ids, logical_date, start_date, end_date, state],
+ filters=[dag_ids, logical_date, run_after, start_date, end_date,
state],
order_by=order_by,
offset=offset,
limit=limit,
diff --git a/airflow/api_fastapi/core_api/routes/ui/dags.py
b/airflow/api_fastapi/core_api/routes/ui/dags.py
index 0ed999c1c99..e89855fe542 100644
--- a/airflow/api_fastapi/core_api/routes/ui/dags.py
+++ b/airflow/api_fastapi/core_api/routes/ui/dags.py
@@ -74,39 +74,39 @@ def recent_dag_runs(
recent_runs_subquery = (
select(
DagRun.dag_id,
- DagRun.logical_date,
+ DagRun.run_after,
func.rank()
.over(
partition_by=DagRun.dag_id,
- order_by=DagRun.logical_date.desc(),
+ order_by=DagRun.run_after.desc(),
)
.label("rank"),
)
- .order_by(DagRun.logical_date.desc())
+ .order_by(DagRun.run_after.desc())
.subquery()
)
dags_with_recent_dag_runs_select = (
select(
DagRun,
DagModel,
- recent_runs_subquery.c.logical_date,
+ recent_runs_subquery.c.run_after,
)
.join(DagModel, DagModel.dag_id == recent_runs_subquery.c.dag_id)
.join(
DagRun,
and_(
DagRun.dag_id == DagModel.dag_id,
- DagRun.logical_date == recent_runs_subquery.c.logical_date,
+ DagRun.run_after == recent_runs_subquery.c.run_after,
),
)
.where(recent_runs_subquery.c.rank <= dag_runs_limit)
.group_by(
DagModel.dag_id,
- recent_runs_subquery.c.logical_date,
- DagRun.logical_date,
+ recent_runs_subquery.c.run_after,
+ DagRun.run_after,
DagRun.id,
)
- .order_by(recent_runs_subquery.c.logical_date.desc())
+ .order_by(recent_runs_subquery.c.run_after.desc())
)
dags_with_recent_dag_runs_select_filter, _ = paginated_select(
statement=dags_with_recent_dag_runs_select,
diff --git a/airflow/ui/openapi-gen/queries/common.ts
b/airflow/ui/openapi-gen/queries/common.ts
index d89d1857839..5d9d328739d 100644
--- a/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow/ui/openapi-gen/queries/common.ts
@@ -578,6 +578,8 @@ export const UseDagRunServiceGetDagRunsKeyFn = (
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
@@ -592,6 +594,8 @@ export const UseDagRunServiceGetDagRunsKeyFn = (
logicalDateLte?: string;
offset?: number;
orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
startDateGte?: string;
startDateLte?: string;
state?: string[];
@@ -611,6 +615,8 @@ export const UseDagRunServiceGetDagRunsKeyFn = (
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow/ui/openapi-gen/queries/prefetch.ts
index b3c706fad96..b2ee8421e4c 100644
--- a/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -771,6 +771,8 @@ export const prefetchUseDagRunServiceGetUpstreamAssetEvents
= (
* @param data.dagId
* @param data.limit
* @param data.offset
+ * @param data.runAfterGte
+ * @param data.runAfterLte
* @param data.logicalDateGte
* @param data.logicalDateLte
* @param data.startDateGte
@@ -795,6 +797,8 @@ export const prefetchUseDagRunServiceGetDagRuns = (
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
@@ -809,6 +813,8 @@ export const prefetchUseDagRunServiceGetDagRuns = (
logicalDateLte?: string;
offset?: number;
orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
startDateGte?: string;
startDateLte?: string;
state?: string[];
@@ -826,6 +832,8 @@ export const prefetchUseDagRunServiceGetDagRuns = (
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
@@ -842,6 +850,8 @@ export const prefetchUseDagRunServiceGetDagRuns = (
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
diff --git a/airflow/ui/openapi-gen/queries/queries.ts
b/airflow/ui/openapi-gen/queries/queries.ts
index 531102d2dd2..226127fc337 100644
--- a/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow/ui/openapi-gen/queries/queries.ts
@@ -939,6 +939,8 @@ export const useDagRunServiceGetUpstreamAssetEvents = <
* @param data.dagId
* @param data.limit
* @param data.offset
+ * @param data.runAfterGte
+ * @param data.runAfterLte
* @param data.logicalDateGte
* @param data.logicalDateLte
* @param data.startDateGte
@@ -966,6 +968,8 @@ export const useDagRunServiceGetDagRuns = <
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
@@ -980,6 +984,8 @@ export const useDagRunServiceGetDagRuns = <
logicalDateLte?: string;
offset?: number;
orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
startDateGte?: string;
startDateLte?: string;
state?: string[];
@@ -1000,6 +1006,8 @@ export const useDagRunServiceGetDagRuns = <
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
@@ -1018,6 +1026,8 @@ export const useDagRunServiceGetDagRuns = <
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
diff --git a/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow/ui/openapi-gen/queries/suspense.ts
index 8b304aa8d6d..27a58d791ed 100644
--- a/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow/ui/openapi-gen/queries/suspense.ts
@@ -916,6 +916,8 @@ export const useDagRunServiceGetUpstreamAssetEventsSuspense
= <
* @param data.dagId
* @param data.limit
* @param data.offset
+ * @param data.runAfterGte
+ * @param data.runAfterLte
* @param data.logicalDateGte
* @param data.logicalDateLte
* @param data.startDateGte
@@ -943,6 +945,8 @@ export const useDagRunServiceGetDagRunsSuspense = <
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
@@ -957,6 +961,8 @@ export const useDagRunServiceGetDagRunsSuspense = <
logicalDateLte?: string;
offset?: number;
orderBy?: string;
+ runAfterGte?: string;
+ runAfterLte?: string;
startDateGte?: string;
startDateLte?: string;
state?: string[];
@@ -977,6 +983,8 @@ export const useDagRunServiceGetDagRunsSuspense = <
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
@@ -995,6 +1003,8 @@ export const useDagRunServiceGetDagRunsSuspense = <
logicalDateLte,
offset,
orderBy,
+ runAfterGte,
+ runAfterLte,
startDateGte,
startDateLte,
state,
diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 263efb9c897..ac5d05377e5 100644
--- a/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -2418,6 +2418,30 @@ export const $DAGRunsBatchBody = {
],
title: "States",
},
+ run_after_gte: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Run After Gte",
+ },
+ run_after_lte: {
+ anyOf: [
+ {
+ type: "string",
+ format: "date-time",
+ },
+ {
+ type: "null",
+ },
+ ],
+ title: "Run After Lte",
+ },
logical_date_gte: {
anyOf: [
{
diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow/ui/openapi-gen/requests/services.gen.ts
index 995e95e2ea3..a56ebe88658 100644
--- a/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -1371,6 +1371,8 @@ export class DagRunService {
* @param data.dagId
* @param data.limit
* @param data.offset
+ * @param data.runAfterGte
+ * @param data.runAfterLte
* @param data.logicalDateGte
* @param data.logicalDateLte
* @param data.startDateGte
@@ -1394,6 +1396,8 @@ export class DagRunService {
query: {
limit: data.limit,
offset: data.offset,
+ run_after_gte: data.runAfterGte,
+ run_after_lte: data.runAfterLte,
logical_date_gte: data.logicalDateGte,
logical_date_lte: data.logicalDateLte,
start_date_gte: data.startDateGte,
diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow/ui/openapi-gen/requests/types.gen.ts
index 325fd93bf84..bf2b8bf6f67 100644
--- a/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -628,6 +628,8 @@ export type DAGRunsBatchBody = {
page_limit?: number;
dag_ids?: Array<string> | null;
states?: Array<DagRunState | null> | null;
+ run_after_gte?: string | null;
+ run_after_lte?: string | null;
logical_date_gte?: string | null;
logical_date_lte?: string | null;
start_date_gte?: string | null;
@@ -1885,6 +1887,8 @@ export type GetDagRunsData = {
logicalDateLte?: string | null;
offset?: number;
orderBy?: string;
+ runAfterGte?: string | null;
+ runAfterLte?: string | null;
startDateGte?: string | null;
startDateLte?: string | null;
state?: Array<string>;
diff --git a/airflow/ui/src/pages/Dag/Overview/Overview.tsx
b/airflow/ui/src/pages/Dag/Overview/Overview.tsx
index 9f512b800ad..3e6e213196b 100644
--- a/airflow/ui/src/pages/Dag/Overview/Overview.tsx
+++ b/airflow/ui/src/pages/Dag/Overview/Overview.tsx
@@ -45,15 +45,15 @@ export const Overview = () => {
const { data: failedRuns, isLoading: isLoadingFailedRuns } =
useDagRunServiceGetDagRuns({
dagId: dagId ?? "",
- logicalDateGte: startDate,
- logicalDateLte: endDate,
+ runAfterGte: startDate,
+ runAfterLte: endDate,
state: ["failed"],
});
const { data: runs, isLoading: isLoadingRuns } = useDagRunServiceGetDagRuns({
dagId: dagId ?? "",
limit: 14,
- orderBy: "-logical_date",
+ orderBy: "-run_after",
});
return (
diff --git a/airflow/ui/src/pages/DagRuns.tsx b/airflow/ui/src/pages/DagRuns.tsx
index 0047f79f147..a98df1c70ea 100644
--- a/airflow/ui/src/pages/DagRuns.tsx
+++ b/airflow/ui/src/pages/DagRuns.tsx
@@ -122,7 +122,7 @@ export const DagRuns = () => {
const { setTableURLState, tableURLState } = useTableURLState();
const { pagination, sorting } = tableURLState;
const [sort] = sorting;
- const orderBy = sort ? `${sort.desc ? "-" : ""}${sort.id}` : "-logical_date";
+ const orderBy = sort ? `${sort.desc ? "-" : ""}${sort.id}` : "-run_after";
const filteredState = searchParams.get(STATE_PARAM);
diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
index 99a2ae83acc..df4dc708d94 100644
--- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -66,6 +66,8 @@ DAG2_RUN2_TRIGGERED_BY = DagRunTriggeredByType.REST_API
START_DATE1 = datetime(2024, 1, 15, 0, 0, tzinfo=timezone.utc)
LOGICAL_DATE1 = datetime(2024, 2, 16, 0, 0, tzinfo=timezone.utc)
LOGICAL_DATE2 = datetime(2024, 2, 20, 0, 0, tzinfo=timezone.utc)
+RUN_AFTER1 = datetime(2024, 2, 16, 0, 0, tzinfo=timezone.utc)
+RUN_AFTER2 = datetime(2024, 2, 20, 0, 0, tzinfo=timezone.utc)
START_DATE2 = datetime(2024, 4, 15, 0, 0, tzinfo=timezone.utc)
LOGICAL_DATE3 = datetime(2024, 5, 16, 0, 0, tzinfo=timezone.utc)
LOGICAL_DATE4 = datetime(2024, 5, 25, 0, 0, tzinfo=timezone.utc)
@@ -397,6 +399,14 @@ class TestGetDagRuns:
},
[DAG1_RUN1_ID, DAG1_RUN2_ID],
),
+ (
+ DAG1_ID,
+ {
+ "run_after_gte": RUN_AFTER1.isoformat(),
+ "run_after_lte": RUN_AFTER2.isoformat(),
+ },
+ [DAG1_RUN1_ID, DAG1_RUN2_ID],
+ ),
(
DAG2_ID,
{
@@ -436,11 +446,27 @@ class TestGetDagRuns:
"logical_date_gte": "invalid",
"start_date_gte": "invalid",
"end_date_gte": "invalid",
+ "run_after_gte": "invalid",
"logical_date_lte": "invalid",
"start_date_lte": "invalid",
"end_date_lte": "invalid",
+ "run_after_lte": "invalid",
}
expected_detail = [
+ {
+ "type": "datetime_from_date_parsing",
+ "loc": ["query", "run_after_gte"],
+ "msg": "Input should be a valid datetime or date, input is too
short",
+ "input": "invalid",
+ "ctx": {"error": "input is too short"},
+ },
+ {
+ "type": "datetime_from_date_parsing",
+ "loc": ["query", "run_after_lte"],
+ "msg": "Input should be a valid datetime or date, input is too
short",
+ "input": "invalid",
+ "ctx": {"error": "input is too short"},
+ },
{
"type": "datetime_from_date_parsing",
"loc": ["query", "logical_date_gte"],
@@ -577,6 +603,7 @@ class TestListDagRunsBatch:
"state", [DAG1_RUN2_ID, DAG1_RUN1_ID, DAG2_RUN1_ID,
DAG2_RUN2_ID], id="order_by_state"
),
pytest.param("dag_id", DAG_RUNS_LIST, id="order_by_dag_id"),
+ pytest.param("run_after", DAG_RUNS_LIST, id="order_by_run_after"),
pytest.param("logical_date", DAG_RUNS_LIST,
id="order_by_logical_date"),
pytest.param("dag_run_id", DAG_RUNS_LIST,
id="order_by_dag_run_id"),
pytest.param("start_date", DAG_RUNS_LIST,
id="order_by_start_date"),
diff --git a/tests/api_fastapi/core_api/routes/ui/test_dags.py
b/tests/api_fastapi/core_api/routes/ui/test_dags.py
index 5e7cee8096c..fc6d5093569 100644
--- a/tests/api_fastapi/core_api/routes/ui/test_dags.py
+++ b/tests/api_fastapi/core_api/routes/ui/test_dags.py
@@ -53,6 +53,7 @@ class TestRecentDagRuns(TestPublicDagEndpoint):
run_type=DagRunType.MANUAL,
start_date=start_date,
logical_date=start_date,
+ run_after=start_date,
state=(DagRunState.FAILED if i % 2 == 0 else
DagRunState.SUCCESS),
triggered_by=DagRunTriggeredByType.TEST,
)
@@ -90,16 +91,16 @@ class TestRecentDagRuns(TestPublicDagEndpoint):
"dag_run_id",
"dag_id",
"state",
- "logical_date",
+ "run_after",
]
for recent_dag_runs in body["dags"]:
dag_runs = recent_dag_runs["latest_dag_runs"]
# check date ordering
- previous_logical_date = None
+ previous_run_after = None
for dag_run in dag_runs:
# validate the response
for key in required_dag_run_key:
assert key in dag_run
- if previous_logical_date:
- assert previous_logical_date > dag_run["logical_date"]
- previous_logical_date = dag_run["logical_date"]
+ if previous_run_after:
+ assert previous_run_after > dag_run["run_after"]
+ previous_run_after = dag_run["run_after"]