This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 411ddeb5d31 Update bulk API permission check to handle 
`action_on_existence` (#56672)
411ddeb5d31 is described below

commit 411ddeb5d310c3d4d0fcaebf941418eef0132568
Author: Vincent <[email protected]>
AuthorDate: Wed Oct 15 10:00:47 2025 -0700

    Update bulk API permission check to handle `action_on_existence` (#56672)
---
 .../src/airflow/api_fastapi/core_api/security.py   | 94 +++++++++++++++-------
 .../unit/api_fastapi/core_api/test_security.py     | 45 +++++++++++
 2 files changed, 108 insertions(+), 31 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py 
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index 50ff119cbbc..1e34efe39fa 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -46,7 +46,14 @@ from 
airflow.api_fastapi.auth.managers.models.resource_details import (
     VariableDetails,
 )
 from airflow.api_fastapi.core_api.base import OrmClause
-from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkBody
+from airflow.api_fastapi.core_api.datamodels.common import (
+    BulkAction,
+    BulkActionOnExistence,
+    BulkBody,
+    BulkCreateAction,
+    BulkDeleteAction,
+    BulkUpdateAction,
+)
 from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody
 from airflow.api_fastapi.core_api.datamodels.pools import PoolBody
 from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
@@ -257,19 +264,22 @@ def requires_access_pool_bulk() -> 
Callable[[BulkBody[PoolBody], BaseUser], None
     ) -> None:
         requests: list[IsAuthorizedPoolRequest] = []
         for action in request.actions:
-            requests.extend(
-                [
-                    {
-                        "method": 
MAP_BULK_ACTION_TO_AUTH_METHOD[action.action],
+            methods = _get_resource_methods_from_bulk_request(action)
+            for pool in action.entities:
+                pool_name = (
+                    cast("str", pool) if action.action == BulkAction.DELETE 
else cast("PoolBody", pool).pool
+                )
+                # For each pool, build a `IsAuthorizedPoolRequest`
+                # The list of `IsAuthorizedPoolRequest` will then be sent 
using `batch_is_authorized_pool`
+                # Each `IsAuthorizedPoolRequest` is similar to calling 
`is_authorized_pool`
+                for method in methods:
+                    req: IsAuthorizedPoolRequest = {
+                        "method": method,
                         "details": PoolDetails(
-                            name=cast("str", pool)
-                            if action.action == BulkAction.DELETE
-                            else cast("PoolBody", pool).pool
+                            name=pool_name,
                         ),
                     }
-                    for pool in action.entities
-                ]
-            )
+                    requests.append(req)
 
         _requires_access(
             is_authorized_callback=lambda: 
get_auth_manager().batch_is_authorized_pool(
@@ -307,19 +317,24 @@ def requires_access_connection_bulk() -> 
Callable[[BulkBody[ConnectionBody], Bas
     ) -> None:
         requests: list[IsAuthorizedConnectionRequest] = []
         for action in request.actions:
-            requests.extend(
-                [
-                    {
-                        "method": 
MAP_BULK_ACTION_TO_AUTH_METHOD[action.action],
+            methods = _get_resource_methods_from_bulk_request(action)
+            for connection in action.entities:
+                connection_id = (
+                    cast("str", connection)
+                    if action.action == BulkAction.DELETE
+                    else cast("ConnectionBody", connection).connection_id
+                )
+                # For each pool, build a `IsAuthorizedConnectionRequest`
+                # The list of `IsAuthorizedConnectionRequest` will then be 
sent using `batch_is_authorized_connection`
+                # Each `IsAuthorizedConnectionRequest` is similar to calling 
`is_authorized_connection`
+                for method in methods:
+                    req: IsAuthorizedConnectionRequest = {
+                        "method": method,
                         "details": ConnectionDetails(
-                            conn_id=cast("str", connection)
-                            if action.action == BulkAction.DELETE
-                            else cast("ConnectionBody", 
connection).connection_id
+                            conn_id=connection_id,
                         ),
                     }
-                    for connection in action.entities
-                ]
-            )
+                    requests.append(req)
 
         _requires_access(
             is_authorized_callback=lambda: 
get_auth_manager().batch_is_authorized_connection(
@@ -373,19 +388,24 @@ def requires_access_variable_bulk() -> 
Callable[[BulkBody[VariableBody], BaseUse
     ) -> None:
         requests: list[IsAuthorizedVariableRequest] = []
         for action in request.actions:
-            requests.extend(
-                [
-                    {
-                        "method": 
MAP_BULK_ACTION_TO_AUTH_METHOD[action.action],
+            methods = _get_resource_methods_from_bulk_request(action)
+            for variable in action.entities:
+                variable_key = (
+                    cast("str", variable)
+                    if action.action == BulkAction.DELETE
+                    else cast("VariableBody", variable).key
+                )
+                # For each variable, build a `IsAuthorizedVariableRequest`
+                # The list of `IsAuthorizedVariableRequest` will then be sent 
using `batch_is_authorized_variable`
+                # Each `IsAuthorizedVariableRequest` is similar to calling 
`is_authorized_variable`
+                for method in methods:
+                    req: IsAuthorizedVariableRequest = {
+                        "method": method,
                         "details": VariableDetails(
-                            key=cast("str", entity)
-                            if action.action == BulkAction.DELETE
-                            else cast("VariableBody", entity).key
+                            key=variable_key,
                         ),
                     }
-                    for entity in action.entities
-                ]
-            )
+                    requests.append(req)
 
         _requires_access(
             is_authorized_callback=lambda: 
get_auth_manager().batch_is_authorized_variable(
@@ -494,3 +514,15 @@ def is_safe_url(target_url: str, request: Request | None = 
None) -> bool:
         if parsed_target.scheme in {"http", "https"} and parsed_target.netloc 
== parsed_base.netloc:
             return True
     return False
+
+
+def _get_resource_methods_from_bulk_request(
+    action: BulkCreateAction | BulkUpdateAction | BulkDeleteAction,
+) -> list[ResourceMethod]:
+    resource_methods: list[ResourceMethod] = 
[MAP_BULK_ACTION_TO_AUTH_METHOD[action.action]]
+    # If ``action_on_existence`` == ``overwrite``, we need to check the user 
has ``PUT`` access as well.
+    # With ``action_on_existence`` == ``overwrite``, a create request is 
actually an update request if the
+    # resource already exists, hence adding this check.
+    if action.action == BulkAction.CREATE and action.action_on_existence == 
BulkActionOnExistence.OVERWRITE:
+        resource_methods.append("PUT")
+    return resource_methods
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py 
b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
index 1be6e8b5667..72d89402912 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
@@ -229,6 +229,13 @@ class TestFastApiSecurity:
                         "action": "delete",
                         "entities": ["test3"],
                     },
+                    {
+                        "action": "create",
+                        "entities": [
+                            {"connection_id": "test4", "conn_type": "test4"},
+                        ],
+                        "action_on_existence": "overwrite",
+                    },
                 ]
             }
         )
@@ -249,6 +256,14 @@ class TestFastApiSecurity:
                     "method": "DELETE",
                     "details": ConnectionDetails(conn_id="test3"),
                 },
+                {
+                    "method": "POST",
+                    "details": ConnectionDetails(conn_id="test4"),
+                },
+                {
+                    "method": "PUT",
+                    "details": ConnectionDetails(conn_id="test4"),
+                },
             ],
             user=user,
         )
@@ -297,6 +312,13 @@ class TestFastApiSecurity:
                         "action": "delete",
                         "entities": ["var3"],
                     },
+                    {
+                        "action": "create",
+                        "entities": [
+                            {"key": "var4", "value": "value4"},
+                        ],
+                        "action_on_existence": "overwrite",
+                    },
                 ]
             }
         )
@@ -317,6 +339,14 @@ class TestFastApiSecurity:
                     "method": "DELETE",
                     "details": VariableDetails(key="var3"),
                 },
+                {
+                    "method": "POST",
+                    "details": VariableDetails(key="var4"),
+                },
+                {
+                    "method": "PUT",
+                    "details": VariableDetails(key="var4"),
+                },
             ],
             user=user,
         )
@@ -365,6 +395,13 @@ class TestFastApiSecurity:
                         "action": "delete",
                         "entities": ["pool3"],
                     },
+                    {
+                        "action": "create",
+                        "entities": [
+                            {"pool": "pool4", "slots": 1},
+                        ],
+                        "action_on_existence": "overwrite",
+                    },
                 ]
             }
         )
@@ -385,6 +422,14 @@ class TestFastApiSecurity:
                     "method": "DELETE",
                     "details": PoolDetails(name="pool3"),
                 },
+                {
+                    "method": "POST",
+                    "details": PoolDetails(name="pool4"),
+                },
+                {
+                    "method": "PUT",
+                    "details": PoolDetails(name="pool4"),
+                },
             ],
             user=user,
         )

Reply via email to