This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9504886608f Fix: PATCH /dags pagination bug and document wildcard
dag_id_pattern (#63665)
9504886608f is described below
commit 9504886608f9b54398f757b4eb6fc3442141d28a
Author: Justin Pakzad <[email protected]>
AuthorDate: Fri Apr 3 10:31:18 2026 -0400
Fix: PATCH /dags pagination bug and document wildcard dag_id_pattern
(#63665)
* fixed pagination bug and updated docstring to clarify dag_id_pattern
wildcard usage
* removed batch loop to update all dags in one shot and added additional
test case
* Fixed MySQL subquery issue
---
.../core_api/openapi/v2-rest-api-generated.yaml | 9 +++-
.../api_fastapi/core_api/routes/public/dags.py | 29 ++++++++---
.../src/airflow/ui/openapi-gen/queries/queries.ts | 4 ++
.../ui/openapi-gen/requests/services.gen.ts | 4 ++
.../core_api/routes/public/test_dags.py | 60 ++++++++++++++++++++++
5 files changed, 98 insertions(+), 8 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index ace51229702..bd8e6aa62ba 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -3301,7 +3301,14 @@ paths:
tags:
- DAG
summary: Patch Dags
- description: Patch multiple DAGs.
+ description: 'Patch multiple DAGs.
+
+
+ If `dag_id_pattern` is not provided, no DAGs will be matched regardless
+
+ of other filters. To match all DAGs, pass a wildcard value such as `~`
+
+ or `%` for `dag_id_pattern`.'
operationId: patch_dags
security:
- OAuth2PasswordBearer: []
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
index 6fbb7c831e9..b83cc1ef223 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py
@@ -26,10 +26,7 @@ from sqlalchemy import delete, func, insert, select, update
from airflow.api.common import delete_dag as delete_dag_module
from airflow.api_fastapi.common.dagbag import DagBagDep,
get_latest_version_of_dag
-from airflow.api_fastapi.common.db.common import (
- SessionDep,
- paginated_select,
-)
+from airflow.api_fastapi.common.db.common import SessionDep,
apply_filters_to_select, paginated_select
from airflow.api_fastapi.common.db.dags import
generate_dag_with_latest_run_query
from airflow.api_fastapi.common.parameters import (
FilterOptionEnum,
@@ -328,7 +325,13 @@ def patch_dags(
session: SessionDep,
update_mask: list[str] | None = Query(None),
) -> DAGCollectionResponse:
- """Patch multiple DAGs."""
+ """
+ Patch multiple DAGs.
+
+ If `dag_id_pattern` is not provided, no DAGs will be matched regardless
+ of other filters. To match all DAGs, pass a wildcard value such as `~`
+ or `%` for `dag_id_pattern`.
+ """
if update_mask:
if update_mask != ["is_paused"]:
raise HTTPException(
@@ -356,10 +359,22 @@ def patch_dags(
session=session,
)
dags = session.scalars(dags_select).all()
- dags_to_update = {dag.dag_id for dag in dags}
+
+ filtered_dag_ids = apply_filters_to_select(
+ statement=select(DagModel.dag_id),
+ filters=[
+ exclude_stale,
+ paused,
+ dag_id_pattern,
+ tags,
+ owners,
+ editable_dags_filter,
+ ],
+ ).subquery()
+
session.execute(
update(DagModel)
- .where(DagModel.dag_id.in_(dags_to_update))
+ .where(DagModel.dag_id.in_(select(filtered_dag_ids.c.dag_id)))
.values(is_paused=patch_body.is_paused)
.execution_options(synchronize_session="fetch")
)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index 52981b15629..40b61f1e1f4 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -2155,6 +2155,10 @@ export const useDagRunServicePatchDagRun = <TData =
Common.DagRunServicePatchDag
/**
* Patch Dags
* Patch multiple DAGs.
+*
+* If `dag_id_pattern` is not provided, no DAGs will be matched regardless
+* of other filters. To match all DAGs, pass a wildcard value such as `~`
+* or `%` for `dag_id_pattern`.
* @param data The data for the request.
* @param data.requestBody
* @param data.updateMask
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index c439bef36c2..ce11a219fb0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -1452,6 +1452,10 @@ export class DagService {
/**
* Patch Dags
* Patch multiple DAGs.
+ *
+ * If `dag_id_pattern` is not provided, no DAGs will be matched regardless
+ * of other filters. To match all DAGs, pass a wildcard value such as `~`
+ * or `%` for `dag_id_pattern`.
* @param data The data for the request.
* @param data.requestBody
* @param data.updateMask
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
index 55515b76931..434cd9e07f3 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
@@ -790,6 +790,66 @@ class TestPatchDags(TestDagEndpoint):
assert paused_dag_ids == set(expected_paused_ids)
check_last_log(session, dag_id=DAG1_ID, event="patch_dag",
logical_date=None)
+ @pytest.mark.parametrize(
+ ("tags_to_add", "query_params", "body", "expected_ids",
"expected_paused_ids"),
+ [
+ (
+ [(DAG1_ID, "tag_1"), (DAG2_ID, "tag_1")],
+ {"dag_id_pattern": "~", "tags": ["tag_1"], "tags_match_mode":
"any"},
+ {"is_paused": True},
+ [DAG1_ID, DAG2_ID],
+ [DAG1_ID, DAG2_ID],
+ ),
+ (
+ [(DAG1_ID, "tag_1"), (DAG2_ID, "tag_2")],
+ {"dag_id_pattern": "~", "tags": ["tag_1", "tag_2"],
"tags_match_mode": "any"},
+ {"is_paused": True},
+ [DAG1_ID, DAG2_ID],
+ [DAG1_ID, DAG2_ID],
+ ),
+ (
+ [(DAG1_ID, "tag_1"), (DAG1_ID, "tag_3"), (DAG2_ID, "tag_1"),
(DAG2_ID, "tag_3")],
+ {"dag_id_pattern": "~", "tags": ["tag_1", "tag_3"],
"tags_match_mode": "all"},
+ {"is_paused": True},
+ [DAG1_ID, DAG2_ID],
+ [DAG1_ID, DAG2_ID],
+ ),
+ ],
+ )
+ def test_patch_dags_by_tags(
+ self,
+ test_client,
+ tags_to_add,
+ query_params,
+ body,
+ expected_ids,
+ expected_paused_ids,
+ session,
+ ):
+ for dag_id, tag_name in tags_to_add:
+ session.add(DagTag(dag_id=dag_id, name=tag_name))
+ session.commit()
+
+ response = test_client.patch("/dags", json=body, params=query_params)
+ assert response.status_code == 200
+ resp_body = response.json()
+ assert {dag["dag_id"] for dag in resp_body["dags"]} ==
set(expected_ids)
+ paused_dag_ids = {dag["dag_id"] for dag in resp_body["dags"] if
dag["is_paused"]}
+ assert paused_dag_ids == set(expected_paused_ids)
+
+ def test_patch_dags_updates_all_beyond_limit(self, test_client, session):
+ response = test_client.patch(
+ "/dags",
+ json={"is_paused": True},
+ params={"dag_id_pattern": "~", "limit": 1},
+ )
+ assert response.status_code == 200
+ assert len(response.json()["dags"]) == 1
+ paused_dags = session.scalars(
+ select(DagModel.dag_id).where(DagModel.is_paused,
~DagModel.is_stale)
+ ).all()
+ assert set(paused_dags) == {DAG1_ID, DAG2_ID}
+
@mock.patch("airflow.api_fastapi.auth.managers.base_auth_manager.BaseAuthManager.get_authorized_dag_ids")
def test_patch_dags_should_call_authorized_dag_ids(self,
mock_get_authorized_dag_ids, test_client):
mock_get_authorized_dag_ids.return_value = {DAG1_ID, DAG2_ID}