This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 8cbfbccb529 [v3-1-test] bulk connection var usage (#61570) (#62076)
8cbfbccb529 is described below
commit 8cbfbccb529ef36a8d8e0952e6da48fbb6707beb
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Wed Feb 18 21:34:52 2026 +0100
[v3-1-test] bulk connection var usage (#61570) (#62076)
* bulk connection var usage (#61570)
(cherry picked from commit 206a0f321397a5d0e8782b674d8c7786629a84ee)
* Fix CI
---------
Co-authored-by: Steve Ahn <[email protected]>
---
.../core_api/services/public/connections.py | 16 +++---
.../core_api/routes/public/test_connections.py | 60 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 10 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
index 7cf70935118..bc6e5e49f75 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/connections.py
@@ -135,8 +135,8 @@ class BulkConnectionService(BulkService[ConnectionBody]):
) -> None:
"""Bulk Update connections."""
to_update_connection_ids = {connection.connection_id for connection in
action.entities}
- _, matched_connection_ids, not_found_connection_ids =
self.categorize_connections(
- to_update_connection_ids
+ existed_connections_dict, matched_connection_ids,
not_found_connection_ids = (
+ self.categorize_connections(to_update_connection_ids)
)
try:
@@ -152,9 +152,7 @@ class BulkConnectionService(BulkService[ConnectionBody]):
for connection in action.entities:
if connection.connection_id in update_connection_ids:
- old_connection: Connection = self.session.scalar(
- select(Connection).filter(Connection.conn_id ==
connection.connection_id).limit(1)
- )
+ old_connection =
existed_connections_dict.get(connection.connection_id)
if old_connection is None:
raise ValidationError(
f"The Connection with connection_id:
`{connection.connection_id}` was not found"
@@ -175,8 +173,8 @@ class BulkConnectionService(BulkService[ConnectionBody]):
) -> None:
"""Bulk delete connections."""
to_delete_connection_ids = set(action.entities)
- _, matched_connection_ids, not_found_connection_ids =
self.categorize_connections(
- to_delete_connection_ids
+ existed_connections_dict, matched_connection_ids,
not_found_connection_ids = (
+ self.categorize_connections(to_delete_connection_ids)
)
try:
@@ -191,9 +189,7 @@ class BulkConnectionService(BulkService[ConnectionBody]):
delete_connection_ids = to_delete_connection_ids
for connection_id in delete_connection_ids:
- existing_connection = self.session.scalar(
- select(Connection).where(Connection.conn_id ==
connection_id).limit(1)
- )
+ existing_connection =
existed_connections_dict.get(connection_id)
if existing_connection:
self.session.delete(existing_connection)
results.success.append(connection_id)
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
index 1f1a51a0102..156acd94496 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_connections.py
@@ -24,6 +24,9 @@ from unittest import mock
import pytest
from sqlalchemy import func, select
+from airflow.api_fastapi.core_api.datamodels.common import BulkActionResponse,
BulkBody
+from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody
+from airflow.api_fastapi.core_api.services.public.connections import
BulkConnectionService
from airflow.models import Connection
from airflow.secrets.environment_variables import CONN_ENV_PREFIX
from airflow.utils.session import provide_session
@@ -1399,6 +1402,63 @@ class TestBulkConnections(TestConnectionEndpoint):
)
assert response.status_code == 403
+ def test_bulk_update_avoids_n_plus_one_queries(self, session):
+ self.create_connections()
+ session.expire_all()
+
+ request = BulkBody[ConnectionBody].model_validate(
+ {
+ "actions": [
+ {
+ "action": "update",
+ "entities": [
+ {
+ "connection_id": TEST_CONN_ID,
+ "conn_type": TEST_CONN_TYPE,
+ "description": "updated_description",
+ },
+ {
+ "connection_id": TEST_CONN_ID_2,
+ "conn_type": TEST_CONN_TYPE_2,
+ "description": "updated_description_2",
+ },
+ ],
+ "action_on_non_existence": "fail",
+ }
+ ]
+ }
+ )
+ service = BulkConnectionService(session=session, request=request)
+ results = BulkActionResponse()
+
+ with assert_queries_count(1, session=session):
+ service.handle_bulk_update(request.actions[0], results)
+
+ assert sorted(results.success) == [TEST_CONN_ID, TEST_CONN_ID_2]
+
+ def test_bulk_delete_avoids_n_plus_one_queries(self, session):
+ self.create_connections()
+ session.expire_all()
+
+ request = BulkBody[ConnectionBody].model_validate(
+ {
+ "actions": [
+ {
+ "action": "delete",
+ "entities": [TEST_CONN_ID, TEST_CONN_ID_2],
+ "action_on_non_existence": "fail",
+ }
+ ]
+ }
+ )
+ service = BulkConnectionService(session=session, request=request)
+ results = BulkActionResponse()
+
+ with assert_queries_count(1, session=session):
+ service.handle_bulk_delete(request.actions[0], results)
+
+ assert sorted(results.success) == [TEST_CONN_ID, TEST_CONN_ID_2]
+
class TestPostConnectionExtraBackwardCompatibility(TestConnectionEndpoint):
def test_post_should_accept_empty_string_as_extra(self, test_client,
session):