This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 8cbfbccb529 [v3-1-test] bulk connection var usage (#61570) (#62076)
8cbfbccb529 is described below

commit 8cbfbccb529ef36a8d8e0952e6da48fbb6707beb
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Wed Feb 18 21:34:52 2026 +0100

    [v3-1-test] bulk connection var usage (#61570) (#62076)
    
    * bulk connection var usage (#61570)
    
    (cherry picked from commit 206a0f321397a5d0e8782b674d8c7786629a84ee)
    
    * Fix CI
    
    ---------
    
    Co-authored-by: Steve Ahn <[email protected]>
---
 .../core_api/services/public/connections.py        | 16 +++---
 .../core_api/routes/public/test_connections.py     | 60 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 10 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
index 7cf70935118..bc6e5e49f75 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
@@ -135,8 +135,8 @@ class BulkConnectionService(BulkService[ConnectionBody]):
     ) -> None:
         """Bulk Update connections."""
         to_update_connection_ids = {connection.connection_id for connection in 
action.entities}
-        _, matched_connection_ids, not_found_connection_ids = 
self.categorize_connections(
-            to_update_connection_ids
+        existed_connections_dict, matched_connection_ids, 
not_found_connection_ids = (
+            self.categorize_connections(to_update_connection_ids)
         )
 
         try:
@@ -152,9 +152,7 @@ class BulkConnectionService(BulkService[ConnectionBody]):
 
             for connection in action.entities:
                 if connection.connection_id in update_connection_ids:
-                    old_connection: Connection = self.session.scalar(
-                        select(Connection).filter(Connection.conn_id == 
connection.connection_id).limit(1)
-                    )
+                    old_connection = 
existed_connections_dict.get(connection.connection_id)
                     if old_connection is None:
                         raise ValidationError(
                             f"The Connection with connection_id: 
`{connection.connection_id}` was not found"
@@ -175,8 +173,8 @@ class BulkConnectionService(BulkService[ConnectionBody]):
     ) -> None:
         """Bulk delete connections."""
         to_delete_connection_ids = set(action.entities)
-        _, matched_connection_ids, not_found_connection_ids = 
self.categorize_connections(
-            to_delete_connection_ids
+        existed_connections_dict, matched_connection_ids, 
not_found_connection_ids = (
+            self.categorize_connections(to_delete_connection_ids)
         )
 
         try:
@@ -191,9 +189,7 @@ class BulkConnectionService(BulkService[ConnectionBody]):
                 delete_connection_ids = to_delete_connection_ids
 
             for connection_id in delete_connection_ids:
-                existing_connection = self.session.scalar(
-                    select(Connection).where(Connection.conn_id == 
connection_id).limit(1)
-                )
+                existing_connection = 
existed_connections_dict.get(connection_id)
                 if existing_connection:
                     self.session.delete(existing_connection)
                     results.success.append(connection_id)
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
index 1f1a51a0102..156acd94496 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
@@ -24,6 +24,9 @@ from unittest import mock
 import pytest
 from sqlalchemy import func, select
 
+from airflow.api_fastapi.core_api.datamodels.common import BulkActionResponse, 
BulkBody
+from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody
+from airflow.api_fastapi.core_api.services.public.connections import 
BulkConnectionService
 from airflow.models import Connection
 from airflow.secrets.environment_variables import CONN_ENV_PREFIX
 from airflow.utils.session import provide_session
@@ -1399,6 +1402,63 @@ class TestBulkConnections(TestConnectionEndpoint):
         )
         assert response.status_code == 403
 
+    def test_bulk_update_avoids_n_plus_one_queries(self, session):
+        self.create_connections()
+        session.expire_all()
+
+        request = BulkBody[ConnectionBody].model_validate(
+            {
+                "actions": [
+                    {
+                        "action": "update",
+                        "entities": [
+                            {
+                                "connection_id": TEST_CONN_ID,
+                                "conn_type": TEST_CONN_TYPE,
+                                "description": "updated_description",
+                            },
+                            {
+                                "connection_id": TEST_CONN_ID_2,
+                                "conn_type": TEST_CONN_TYPE_2,
+                                "description": "updated_description_2",
+                            },
+                        ],
+                        "action_on_non_existence": "fail",
+                    }
+                ]
+            }
+        )
+        service = BulkConnectionService(session=session, request=request)
+        results = BulkActionResponse()
+
+        with assert_queries_count(1, session=session):
+            service.handle_bulk_update(request.actions[0], results)
+
+        assert sorted(results.success) == [TEST_CONN_ID, TEST_CONN_ID_2]
+
+    def test_bulk_delete_avoids_n_plus_one_queries(self, session):
+        self.create_connections()
+        session.expire_all()
+
+        request = BulkBody[ConnectionBody].model_validate(
+            {
+                "actions": [
+                    {
+                        "action": "delete",
+                        "entities": [TEST_CONN_ID, TEST_CONN_ID_2],
+                        "action_on_non_existence": "fail",
+                    }
+                ]
+            }
+        )
+        service = BulkConnectionService(session=session, request=request)
+        results = BulkActionResponse()
+
+        with assert_queries_count(1, session=session):
+            service.handle_bulk_delete(request.actions[0], results)
+
+        assert sorted(results.success) == [TEST_CONN_ID, TEST_CONN_ID_2]
+
 
 class TestPostConnectionExtraBackwardCompatibility(TestConnectionEndpoint):
     def test_post_should_accept_empty_string_as_extra(self, test_client, 
session):

Reply via email to