This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 68c781cea4e [v3-2-test] Fix N+1 query pattern in bulk pool delete
endpoint (#66222) (#67108)
68c781cea4e is described below
commit 68c781cea4e085ac92c97c45d4dda9e20965db14
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon May 18 20:04:06 2026 +0530
[v3-2-test] Fix N+1 query pattern in bulk pool delete endpoint (#66222)
(#67108)
* Fix N+1 query pattern in bulk pool delete endpoint
* Add query-count test for bulk pool delete
* Refactor bulk-delete query-count test to parametrize
* Parametrize bulk-delete query-count test by pool size
(cherry picked from commit 91806fd139b61ff07303c9c325d9a421c62ef64d)
Co-authored-by: Colten <[email protected]>
---
.../api_fastapi/core_api/services/public/pools.py | 16 ++++------
.../core_api/routes/public/test_pools.py | 36 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 10 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
index d23b0acc02b..dce54099772 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
@@ -188,7 +188,9 @@ class BulkPoolService(BulkService[PoolBody]):
def handle_bulk_delete(self, action: BulkDeleteAction[PoolBody], results:
BulkActionResponse) -> None:
"""Bulk delete pools."""
to_delete_pool_names = set(action.entities)
- _, matched_pool_names, not_found_pool_names =
self.categorize_pools(to_delete_pool_names)
+ existing_pools_dict, matched_pool_names, not_found_pool_names =
self.categorize_pools(
+ to_delete_pool_names
+ )
try:
if action.action_on_non_existence == BulkActionNotOnExistence.FAIL
and not_found_pool_names:
@@ -196,16 +198,10 @@ class BulkPoolService(BulkService[PoolBody]):
status_code=status.HTTP_404_NOT_FOUND,
detail=f"The pools with these pool names:
{not_found_pool_names} were not found.",
)
- if action.action_on_non_existence == BulkActionNotOnExistence.SKIP:
- delete_pool_names = matched_pool_names
- else:
- delete_pool_names = to_delete_pool_names
- for pool_name in delete_pool_names:
- existing_pool =
self.session.scalar(select(Pool).where(Pool.pool == pool_name).limit(1))
- if existing_pool:
- self.session.delete(existing_pool)
- results.success.append(pool_name)
+ for pool_name in matched_pool_names:
+ self.session.delete(existing_pools_dict[pool_name])
+ results.success.append(pool_name)
except HTTPException as e:
results.errors.append({"error": f"{e.detail}", "status_code":
e.status_code})
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
index 2741d054335..4cd953d9518 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
@@ -25,6 +25,7 @@ from airflow.models.pool import Pool
from airflow.models.team import Team
from airflow.utils.session import provide_session
+from tests_common.test_utils.asserts import count_queries
from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.db import clear_db_pools, clear_db_teams
from tests_common.test_utils.logs import check_last_log
@@ -1133,6 +1134,41 @@ class TestBulkPools(TestPoolsEndpoint):
assert updated_pool.description is None # unchanged
assert updated_pool.include_deferred is True # unchanged
+ @pytest.mark.parametrize(
+ ("pool_count"),
+ [5, 10, 20],
+ )
+ def test_bulk_delete_query_count_is_independent_of_pool_count(self,
test_client, session, pool_count):
+ # Regression guard for the N+1 fix in
BulkPoolService.handle_bulk_delete:
+ # the query count for a bulk delete must be the same regardless of how
+ # many pools are deleted. A regression that re-queries each pool inside
+ # the loop would add one SELECT per pool, so the larger run would issue
+ # strictly more queries than the smaller one.
+
+ EXPECTED_QUERY_COUNT = 4
+
+ pool_names = [f"perf_pool_{pool_count}_{i}" for i in range(pool_count)]
+ session.add_all(Pool(pool=name, slots=1, include_deferred=False) for
name in pool_names)
+ session.commit()
+
+ request_body = {
+ "actions": [{"action": "delete", "entities": pool_names,
"action_on_non_existence": "fail"}]
+ }
+
+ with count_queries() as result:
+ response = test_client.patch("/pools", json=request_body)
+
+ assert response.status_code == 200
+ assert sorted(response.json()["delete"]["success"]) ==
sorted(pool_names)
+ assert
session.scalars(select(Pool).where(Pool.pool.in_(pool_names))).all() == []
+
+ query_count = sum(result.values())
+
+ assert query_count == EXPECTED_QUERY_COUNT, (
+ f"Bulk-delete query count {query_count} does not match expected
{EXPECTED_QUERY_COUNT}. "
+ f"A regression that re-queries pools inside the loop would add one
SELECT per pool."
+ )
+
def test_should_respond_401(self, unauthenticated_test_client):
response = unauthenticated_test_client.patch("/pools", json={})
assert response.status_code == 401