This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a5684bf8d03 Check destination team permission when using bulk APIs 
with connections, variables and pools (#68573)
a5684bf8d03 is described below

commit a5684bf8d03c311b91df543393692bbf367536c5
Author: Vincent <[email protected]>
AuthorDate: Mon Jun 15 12:23:24 2026 -0400

    Check destination team permission when using bulk APIs with connections, 
variables and pools (#68573)
---
 .../src/airflow/api_fastapi/core_api/security.py   | 55 +++++++++---
 .../unit/api_fastapi/core_api/test_security.py     | 99 ++++++++++++++++++++++
 2 files changed, 142 insertions(+), 12 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py 
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index 8a009231f12..76f29220309 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -441,6 +441,7 @@ def requires_access_pool_bulk() -> 
Callable[[BulkBody[PoolBody], BaseUser], None
         request: BulkBody[PoolBody],
         user: GetUserDep,
     ) -> None:
+        multi_team = conf.getboolean("core", "multi_team")
         # Build the list of pool names provided as part of the request that 
may correspond to
         # an existing resource (UPDATE / DELETE, or CREATE+OVERWRITE which may 
turn into a PUT).
         existing_pool_names = [
@@ -459,9 +460,6 @@ def requires_access_pool_bulk() -> 
Callable[[BulkBody[PoolBody], BaseUser], None
                 pool_name = (
                     cast("str", pool) if action.action == BulkAction.DELETE 
else cast("PoolBody", pool).pool
                 )
-                # For each pool, build a `IsAuthorizedPoolRequest`
-                # The list of `IsAuthorizedPoolRequest` will then be sent 
using `batch_is_authorized_pool`
-                # Each `IsAuthorizedPoolRequest` is similar to calling 
`is_authorized_pool`
                 for method in methods:
                     req: IsAuthorizedPoolRequest = {
                         "method": method,
@@ -471,9 +469,19 @@ def requires_access_pool_bulk() -> 
Callable[[BulkBody[PoolBody], BaseUser], None
                         ),
                     }
                     requests.append(req)
+                # Authorize the destination team_name when the entity body 
requests a team change.
+                if multi_team and _bulk_action_sets_team(action):
+                    dest_team = cast("PoolBody", pool).team_name
+                    if dest_team is not None and dest_team != 
pool_name_to_team.get(pool_name):
+                        for method in methods:
+                            requests.append(
+                                {
+                                    "method": method,
+                                    "details": PoolDetails(name=pool_name, 
team_name=dest_team),
+                                }
+                            )
 
         _requires_access(
-            # By calling `batch_is_authorized_pool`, we check the user has 
access to all pools provided in the request
             is_authorized_callback=lambda: 
get_auth_manager().batch_is_authorized_pool(
                 requests=requests,
                 user=user,
@@ -543,6 +551,7 @@ def requires_access_connection_bulk() -> 
Callable[[BulkBody[ConnectionBody], Bas
         request: BulkBody[ConnectionBody],
         user: GetUserDep,
     ) -> None:
+        multi_team = conf.getboolean("core", "multi_team")
         # Build the list of ``conn_id`` provided as part of the request that 
may correspond to
         # an existing resource (UPDATE / DELETE, or CREATE+OVERWRITE which may 
turn into a PUT).
         existing_connection_ids = [
@@ -565,9 +574,6 @@ def requires_access_connection_bulk() -> 
Callable[[BulkBody[ConnectionBody], Bas
                     if action.action == BulkAction.DELETE
                     else cast("ConnectionBody", connection).connection_id
                 )
-                # For each pool, build a `IsAuthorizedConnectionRequest`
-                # The list of `IsAuthorizedConnectionRequest` will then be 
sent using `batch_is_authorized_connection`
-                # Each `IsAuthorizedConnectionRequest` is similar to calling 
`is_authorized_connection`
                 for method in methods:
                     req: IsAuthorizedConnectionRequest = {
                         "method": method,
@@ -577,9 +583,19 @@ def requires_access_connection_bulk() -> 
Callable[[BulkBody[ConnectionBody], Bas
                         ),
                     }
                     requests.append(req)
+                # Authorize the destination team_name when the entity body 
requests a team change.
+                if multi_team and _bulk_action_sets_team(action):
+                    dest_team = cast("ConnectionBody", connection).team_name
+                    if dest_team is not None and dest_team != 
conn_id_to_team.get(connection_id):
+                        for method in methods:
+                            requests.append(
+                                {
+                                    "method": method,
+                                    "details": 
ConnectionDetails(conn_id=connection_id, team_name=dest_team),
+                                }
+                            )
 
         _requires_access(
-            # By calling `batch_is_authorized_connection`, we check the user 
has access to all connections provided in the request
             is_authorized_callback=lambda: 
get_auth_manager().batch_is_authorized_connection(
                 requests=requests,
                 user=user,
@@ -686,6 +702,7 @@ def requires_access_variable_bulk() -> 
Callable[[BulkBody[VariableBody], BaseUse
         request: BulkBody[VariableBody],
         user: GetUserDep,
     ) -> None:
+        multi_team = conf.getboolean("core", "multi_team")
         # Build the list of variable keys provided as part of the request that 
may correspond to
         # an existing resource (UPDATE / DELETE, or CREATE+OVERWRITE which may 
turn into a PUT).
         existing_variable_keys = [
@@ -706,9 +723,6 @@ def requires_access_variable_bulk() -> 
Callable[[BulkBody[VariableBody], BaseUse
                     if action.action == BulkAction.DELETE
                     else cast("VariableBody", variable).key
                 )
-                # For each variable, build a `IsAuthorizedVariableRequest`
-                # The list of `IsAuthorizedVariableRequest` will then be sent 
using `batch_is_authorized_variable`
-                # Each `IsAuthorizedVariableRequest` is similar to calling 
`is_authorized_variable`
                 for method in methods:
                     req: IsAuthorizedVariableRequest = {
                         "method": method,
@@ -718,9 +732,19 @@ def requires_access_variable_bulk() -> 
Callable[[BulkBody[VariableBody], BaseUse
                         ),
                     }
                     requests.append(req)
+                # Authorize the destination team_name when the entity body 
requests a team change.
+                if multi_team and _bulk_action_sets_team(action):
+                    dest_team = cast("VariableBody", variable).team_name
+                    if dest_team is not None and dest_team != 
var_key_to_team.get(variable_key):
+                        for method in methods:
+                            requests.append(
+                                {
+                                    "method": method,
+                                    "details": 
VariableDetails(key=variable_key, team_name=dest_team),
+                                }
+                            )
 
         _requires_access(
-            # By calling `batch_is_authorized_variable`, we check the user has 
access to all variables provided in the request
             is_authorized_callback=lambda: 
get_auth_manager().batch_is_authorized_variable(
                 requests=requests,
                 user=user,
@@ -981,3 +1005,10 @@ def _bulk_action_needs_existing_team_lookup(
     if action.action != BulkAction.CREATE:
         return True
     return action.action_on_existence == BulkActionOnExistence.OVERWRITE
+
+
+def _bulk_action_sets_team(
+    action: BulkCreateAction | BulkUpdateAction | BulkDeleteAction,
+) -> bool:
+    """Return True if this action can write a team_name (UPDATE, or CREATE 
that carries a body)."""
+    return action.action in (BulkAction.UPDATE, BulkAction.CREATE)
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py 
b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
index e916bc19e5d..d7614f82539 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
@@ -1266,6 +1266,105 @@ class TestFastApiSecurity:
             user=user,
         )
 
+    @patch.object(Pool, "get_name_to_team_name_mapping")
+    @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_requires_access_pool_bulk_checks_destination_team(
+        self, mock_get_auth_manager, mock_get_name_to_team_name_mapping
+    ):
+        """Bulk UPDATE that changes team_name must authorize the destination 
team."""
+        auth_manager = Mock()
+        auth_manager.batch_is_authorized_pool.return_value = True
+        mock_get_auth_manager.return_value = auth_manager
+        mock_get_name_to_team_name_mapping.return_value = {"pool1": "team_b"}
+
+        request = BulkBody[PoolBody].model_validate(
+            {
+                "actions": [
+                    {
+                        "action": "update",
+                        "entities": [{"pool": "pool1", "slots": 5, 
"team_name": "team_a"}],
+                    },
+                ]
+            }
+        )
+        user = Mock()
+        requires_access_pool_bulk()(request, user)
+
+        auth_manager.batch_is_authorized_pool.assert_called_once_with(
+            requests=[
+                {"method": "PUT", "details": PoolDetails(name="pool1", 
team_name="team_b")},
+                {"method": "PUT", "details": PoolDetails(name="pool1", 
team_name="team_a")},
+            ],
+            user=user,
+        )
+
+    @patch.object(Connection, "get_conn_id_to_team_name_mapping")
+    @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_requires_access_connection_bulk_checks_destination_team(
+        self, mock_get_auth_manager, mock_get_conn_id_to_team_name_mapping
+    ):
+        """Bulk UPDATE that changes team_name must authorize the destination 
team."""
+        auth_manager = Mock()
+        auth_manager.batch_is_authorized_connection.return_value = True
+        mock_get_auth_manager.return_value = auth_manager
+        mock_get_conn_id_to_team_name_mapping.return_value = {"conn1": 
"team_b"}
+
+        request = BulkBody[ConnectionBody].model_validate(
+            {
+                "actions": [
+                    {
+                        "action": "update",
+                        "entities": [{"connection_id": "conn1", "conn_type": 
"http", "team_name": "team_a"}],
+                    },
+                ]
+            }
+        )
+        user = Mock()
+        requires_access_connection_bulk()(request, user)
+
+        auth_manager.batch_is_authorized_connection.assert_called_once_with(
+            requests=[
+                {"method": "PUT", "details": 
ConnectionDetails(conn_id="conn1", team_name="team_b")},
+                {"method": "PUT", "details": 
ConnectionDetails(conn_id="conn1", team_name="team_a")},
+            ],
+            user=user,
+        )
+
+    @patch.object(Variable, "get_key_to_team_name_mapping")
+    @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+    @conf_vars({("core", "multi_team"): "True"})
+    def test_requires_access_variable_bulk_checks_destination_team(
+        self, mock_get_auth_manager, mock_get_key_to_team_name_mapping
+    ):
+        """Bulk UPDATE that changes team_name must authorize the destination 
team."""
+        auth_manager = Mock()
+        auth_manager.batch_is_authorized_variable.return_value = True
+        mock_get_auth_manager.return_value = auth_manager
+        mock_get_key_to_team_name_mapping.return_value = {"var1": "team_b"}
+
+        request = BulkBody[VariableBody].model_validate(
+            {
+                "actions": [
+                    {
+                        "action": "update",
+                        "entities": [{"key": "var1", "value": "val", 
"team_name": "team_a"}],
+                    },
+                ]
+            }
+        )
+        user = Mock()
+        requires_access_variable_bulk()(request, user)
+
+        auth_manager.batch_is_authorized_variable.assert_called_once_with(
+            requests=[
+                {"method": "PUT", "details": VariableDetails(key="var1", 
team_name="team_b")},
+                {"method": "PUT", "details": VariableDetails(key="var1", 
team_name="team_a")},
+            ],
+            user=user,
+        )
+
 
 class TestAuthManagerDependency:
     """Test the auth_manager_from_app dependency function."""

Reply via email to