pierrejeambrun commented on code in PR #63665:
URL: https://github.com/apache/airflow/pull/63665#discussion_r3022247454
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py:
##########
@@ -790,6 +790,53 @@ def test_patch_dags(
assert paused_dag_ids == set(expected_paused_ids)
check_last_log(session, dag_id=DAG1_ID, event="patch_dag",
logical_date=None)
+ @pytest.mark.parametrize(
+ ("tags_to_add", "query_params", "body", "expected_ids",
"expected_paused_ids"),
+ [
+ (
+ [(DAG1_ID, "tag_1"), (DAG2_ID, "tag_1")],
+ {"dag_id_pattern": "~", "tags": ["tag_1"], "tags_match_mode":
"any"},
+ {"is_paused": True},
+ [DAG1_ID, DAG2_ID],
+ [DAG1_ID, DAG2_ID],
+ ),
+ (
+ [(DAG1_ID, "tag_1"), (DAG2_ID, "tag_2")],
+ {"dag_id_pattern": "~", "tags": ["tag_1", "tag_2"],
"tags_match_mode": "any"},
+ {"is_paused": True},
+ [DAG1_ID, DAG2_ID],
+ [DAG1_ID, DAG2_ID],
+ ),
+ (
+ [(DAG1_ID, "tag_1"), (DAG1_ID, "tag_3"), (DAG2_ID, "tag_1"),
(DAG2_ID, "tag_3")],
+ {"dag_id_pattern": "~", "tags": ["tag_1", "tag_3"],
"tags_match_mode": "all"},
+ {"is_paused": True},
+ [DAG1_ID, DAG2_ID],
+ [DAG1_ID, DAG2_ID],
+ ),
+ ],
+ )
+ def test_patch_dags_by_tags(
+ self,
+ test_client,
+ tags_to_add,
+ query_params,
+ body,
+ expected_ids,
+ expected_paused_ids,
+ session,
+ ):
+ for dag_id, tag_name in tags_to_add:
+ session.add(DagTag(dag_id=dag_id, name=tag_name))
+ session.commit()
+
+ response = test_client.patch("/dags", json=body, params=query_params)
+ assert response.status_code == 200
+ resp_body = response.json()
+ assert {dag["dag_id"] for dag in resp_body["dags"]} ==
set(expected_ids)
+ paused_dag_ids = {dag["dag_id"] for dag in resp_body["dags"] if
dag["is_paused"]}
+ assert paused_dag_ids == set(expected_paused_ids)
+
Review Comment:
Also we probably should add a test for pausing multiple dags, when 'limit' =
1. To verify that dags are paused even across multiple pages.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dags.py:
##########
@@ -356,7 +362,26 @@ def patch_dags(
session=session,
)
dags = session.scalars(dags_select).all()
- dags_to_update = {dag.dag_id for dag in dags}
+ dags_to_update: set[str] = set()
+
+ for current_offset in range(0, total_entries, limit.value): # type:
ignore[arg-type]
+ dags_select, _ = paginated_select(
+ statement=select(DagModel.dag_id),
+ filters=[
+ exclude_stale,
+ paused,
+ dag_id_pattern,
+ tags,
+ owners,
+ editable_dags_filter,
+ ],
+ order_by=None,
+ offset=QueryOffset.depends(current_offset),
+ limit=limit,
+ session=session,
+ )
+ dags_to_update.update(session.scalars(dags_select).all())
Review Comment:
I just realized that there probably is a better approach than this. Because
this will make as many db request as the number of pages.
Instead by using a subquery-based update such as:
```
filtered_dag_ids = apply_filters_to_select(
statement=select(DagModel.dag_id),
filters=[exclude_stale, paused, dag_id_pattern, tags, owners,
editable_dags_filter],
)
session.execute(
update(DagModel)
.where(DagModel.dag_id.in_(filtered_dag_ids))
.values(is_paused=patch_body.is_paused)
.execution_options(synchronize_session="fetch")
)
```
We can make this call the db only once, and get rid of looping / type
ignores etc...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]