This is an automated email from the ASF dual-hosted git repository.

jason810496 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 91806fd139b Fix N+1 query pattern in bulk pool delete endpoint (#66222)
91806fd139b is described below

commit 91806fd139b61ff07303c9c325d9a421c62ef64d
Author: Colten <[email protected]>
AuthorDate: Mon May 18 19:36:51 2026 +0800

    Fix N+1 query pattern in bulk pool delete endpoint (#66222)
    
    * Fix N+1 query pattern in bulk pool delete endpoint
    
    * Add query-count test for bulk pool delete
    
    * Refactor bulk-delete query-count test to parametrize
    
    * Parametrize bulk-delete query-count test by pool size
---
 .../api_fastapi/core_api/services/public/pools.py  | 16 ++++------
 .../core_api/routes/public/test_pools.py           | 36 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 10 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
index d23b0acc02b..dce54099772 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py
@@ -188,7 +188,9 @@ class BulkPoolService(BulkService[PoolBody]):
     def handle_bulk_delete(self, action: BulkDeleteAction[PoolBody], results: 
BulkActionResponse) -> None:
         """Bulk delete pools."""
         to_delete_pool_names = set(action.entities)
-        _, matched_pool_names, not_found_pool_names = 
self.categorize_pools(to_delete_pool_names)
+        existing_pools_dict, matched_pool_names, not_found_pool_names = 
self.categorize_pools(
+            to_delete_pool_names
+        )
 
         try:
             if action.action_on_non_existence == BulkActionNotOnExistence.FAIL 
and not_found_pool_names:
@@ -196,16 +198,10 @@ class BulkPoolService(BulkService[PoolBody]):
                     status_code=status.HTTP_404_NOT_FOUND,
                     detail=f"The pools with these pool names: 
{not_found_pool_names} were not found.",
                 )
-            if action.action_on_non_existence == BulkActionNotOnExistence.SKIP:
-                delete_pool_names = matched_pool_names
-            else:
-                delete_pool_names = to_delete_pool_names
 
-            for pool_name in delete_pool_names:
-                existing_pool = 
self.session.scalar(select(Pool).where(Pool.pool == pool_name).limit(1))
-                if existing_pool:
-                    self.session.delete(existing_pool)
-                    results.success.append(pool_name)
+            for pool_name in matched_pool_names:
+                self.session.delete(existing_pools_dict[pool_name])
+                results.success.append(pool_name)
 
         except HTTPException as e:
             results.errors.append({"error": f"{e.detail}", "status_code": 
e.status_code})
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
index 2741d054335..4cd953d9518 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_pools.py
@@ -25,6 +25,7 @@ from airflow.models.pool import Pool
 from airflow.models.team import Team
 from airflow.utils.session import provide_session
 
+from tests_common.test_utils.asserts import count_queries
 from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.db import clear_db_pools, clear_db_teams
 from tests_common.test_utils.logs import check_last_log
@@ -1133,6 +1134,41 @@ class TestBulkPools(TestPoolsEndpoint):
         assert updated_pool.description is None  # unchanged
         assert updated_pool.include_deferred is True  # unchanged
 
+    @pytest.mark.parametrize(
+        ("pool_count"),
+        [5, 10, 20],
+    )
+    def test_bulk_delete_query_count_is_independent_of_pool_count(self, 
test_client, session, pool_count):
+        # Regression guard for the N+1 fix in 
BulkPoolService.handle_bulk_delete:
+        # the query count for a bulk delete must be the same regardless of how
+        # many pools are deleted. A regression that re-queries each pool inside
+        # the loop would add one SELECT per pool, so the larger run would issue
+        # strictly more queries than the smaller one.
+
+        EXPECTED_QUERY_COUNT = 4
+
+        pool_names = [f"perf_pool_{pool_count}_{i}" for i in range(pool_count)]
+        session.add_all(Pool(pool=name, slots=1, include_deferred=False) for 
name in pool_names)
+        session.commit()
+
+        request_body = {
+            "actions": [{"action": "delete", "entities": pool_names, 
"action_on_non_existence": "fail"}]
+        }
+
+        with count_queries() as result:
+            response = test_client.patch("/pools", json=request_body)
+
+        assert response.status_code == 200
+        assert sorted(response.json()["delete"]["success"]) == 
sorted(pool_names)
+        assert 
session.scalars(select(Pool).where(Pool.pool.in_(pool_names))).all() == []
+
+        query_count = sum(result.values())
+
+        assert query_count == EXPECTED_QUERY_COUNT, (
+            f"Bulk-delete query count {query_count} does not match expected 
{EXPECTED_QUERY_COUNT}. "
+            f"A regression that re-queries pools inside the loop would add one 
SELECT per pool."
+        )
+
     def test_should_respond_401(self, unauthenticated_test_client):
         response = unauthenticated_test_client.patch("/pools", json={})
         assert response.status_code == 401

Reply via email to