This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 411ddeb5d31 Update bulk API permission check to handle
`action_on_existence` (#56672)
411ddeb5d31 is described below
commit 411ddeb5d310c3d4d0fcaebf941418eef0132568
Author: Vincent <[email protected]>
AuthorDate: Wed Oct 15 10:00:47 2025 -0700
Update bulk API permission check to handle `action_on_existence` (#56672)
---
.../src/airflow/api_fastapi/core_api/security.py | 94 +++++++++++++++-------
.../unit/api_fastapi/core_api/test_security.py | 45 +++++++++++
2 files changed, 108 insertions(+), 31 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index 50ff119cbbc..1e34efe39fa 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -46,7 +46,14 @@ from
airflow.api_fastapi.auth.managers.models.resource_details import (
VariableDetails,
)
from airflow.api_fastapi.core_api.base import OrmClause
-from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkBody
+from airflow.api_fastapi.core_api.datamodels.common import (
+ BulkAction,
+ BulkActionOnExistence,
+ BulkBody,
+ BulkCreateAction,
+ BulkDeleteAction,
+ BulkUpdateAction,
+)
from airflow.api_fastapi.core_api.datamodels.connections import ConnectionBody
from airflow.api_fastapi.core_api.datamodels.pools import PoolBody
from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
@@ -257,19 +264,22 @@ def requires_access_pool_bulk() ->
Callable[[BulkBody[PoolBody], BaseUser], None
) -> None:
requests: list[IsAuthorizedPoolRequest] = []
for action in request.actions:
- requests.extend(
- [
- {
- "method":
MAP_BULK_ACTION_TO_AUTH_METHOD[action.action],
+ methods = _get_resource_methods_from_bulk_request(action)
+ for pool in action.entities:
+ pool_name = (
+ cast("str", pool) if action.action == BulkAction.DELETE
else cast("PoolBody", pool).pool
+ )
+ # For each pool, build a `IsAuthorizedPoolRequest`
+ # The list of `IsAuthorizedPoolRequest` will then be sent
using `batch_is_authorized_pool`
+ # Each `IsAuthorizedPoolRequest` is similar to calling
`is_authorized_pool`
+ for method in methods:
+ req: IsAuthorizedPoolRequest = {
+ "method": method,
"details": PoolDetails(
- name=cast("str", pool)
- if action.action == BulkAction.DELETE
- else cast("PoolBody", pool).pool
+ name=pool_name,
),
}
- for pool in action.entities
- ]
- )
+ requests.append(req)
_requires_access(
is_authorized_callback=lambda:
get_auth_manager().batch_is_authorized_pool(
@@ -307,19 +317,24 @@ def requires_access_connection_bulk() ->
Callable[[BulkBody[ConnectionBody], Bas
) -> None:
requests: list[IsAuthorizedConnectionRequest] = []
for action in request.actions:
- requests.extend(
- [
- {
- "method":
MAP_BULK_ACTION_TO_AUTH_METHOD[action.action],
+ methods = _get_resource_methods_from_bulk_request(action)
+ for connection in action.entities:
+ connection_id = (
+ cast("str", connection)
+ if action.action == BulkAction.DELETE
+ else cast("ConnectionBody", connection).connection_id
+ )
+ # For each pool, build a `IsAuthorizedConnectionRequest`
+ # The list of `IsAuthorizedConnectionRequest` will then be
sent using `batch_is_authorized_connection`
+ # Each `IsAuthorizedConnectionRequest` is similar to calling
`is_authorized_connection`
+ for method in methods:
+ req: IsAuthorizedConnectionRequest = {
+ "method": method,
"details": ConnectionDetails(
- conn_id=cast("str", connection)
- if action.action == BulkAction.DELETE
- else cast("ConnectionBody",
connection).connection_id
+ conn_id=connection_id,
),
}
- for connection in action.entities
- ]
- )
+ requests.append(req)
_requires_access(
is_authorized_callback=lambda:
get_auth_manager().batch_is_authorized_connection(
@@ -373,19 +388,24 @@ def requires_access_variable_bulk() ->
Callable[[BulkBody[VariableBody], BaseUse
) -> None:
requests: list[IsAuthorizedVariableRequest] = []
for action in request.actions:
- requests.extend(
- [
- {
- "method":
MAP_BULK_ACTION_TO_AUTH_METHOD[action.action],
+ methods = _get_resource_methods_from_bulk_request(action)
+ for variable in action.entities:
+ variable_key = (
+ cast("str", variable)
+ if action.action == BulkAction.DELETE
+ else cast("VariableBody", variable).key
+ )
+ # For each variable, build a `IsAuthorizedVariableRequest`
+ # The list of `IsAuthorizedVariableRequest` will then be sent
using `batch_is_authorized_variable`
+ # Each `IsAuthorizedVariableRequest` is similar to calling
`is_authorized_variable`
+ for method in methods:
+ req: IsAuthorizedVariableRequest = {
+ "method": method,
"details": VariableDetails(
- key=cast("str", entity)
- if action.action == BulkAction.DELETE
- else cast("VariableBody", entity).key
+ key=variable_key,
),
}
- for entity in action.entities
- ]
- )
+ requests.append(req)
_requires_access(
is_authorized_callback=lambda:
get_auth_manager().batch_is_authorized_variable(
@@ -494,3 +514,15 @@ def is_safe_url(target_url: str, request: Request | None =
None) -> bool:
if parsed_target.scheme in {"http", "https"} and parsed_target.netloc
== parsed_base.netloc:
return True
return False
+
+
+def _get_resource_methods_from_bulk_request(
+ action: BulkCreateAction | BulkUpdateAction | BulkDeleteAction,
+) -> list[ResourceMethod]:
+ resource_methods: list[ResourceMethod] =
[MAP_BULK_ACTION_TO_AUTH_METHOD[action.action]]
+ # If ``action_on_existence`` == ``overwrite``, we need to check the user
has ``PUT`` access as well.
+ # With ``action_on_existence`` == ``overwrite``, a create request is
actually an update request if the
+ # resource already exists, hence adding this check.
+ if action.action == BulkAction.CREATE and action.action_on_existence ==
BulkActionOnExistence.OVERWRITE:
+ resource_methods.append("PUT")
+ return resource_methods
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
index 1be6e8b5667..72d89402912 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
@@ -229,6 +229,13 @@ class TestFastApiSecurity:
"action": "delete",
"entities": ["test3"],
},
+ {
+ "action": "create",
+ "entities": [
+ {"connection_id": "test4", "conn_type": "test4"},
+ ],
+ "action_on_existence": "overwrite",
+ },
]
}
)
@@ -249,6 +256,14 @@ class TestFastApiSecurity:
"method": "DELETE",
"details": ConnectionDetails(conn_id="test3"),
},
+ {
+ "method": "POST",
+ "details": ConnectionDetails(conn_id="test4"),
+ },
+ {
+ "method": "PUT",
+ "details": ConnectionDetails(conn_id="test4"),
+ },
],
user=user,
)
@@ -297,6 +312,13 @@ class TestFastApiSecurity:
"action": "delete",
"entities": ["var3"],
},
+ {
+ "action": "create",
+ "entities": [
+ {"key": "var4", "value": "value4"},
+ ],
+ "action_on_existence": "overwrite",
+ },
]
}
)
@@ -317,6 +339,14 @@ class TestFastApiSecurity:
"method": "DELETE",
"details": VariableDetails(key="var3"),
},
+ {
+ "method": "POST",
+ "details": VariableDetails(key="var4"),
+ },
+ {
+ "method": "PUT",
+ "details": VariableDetails(key="var4"),
+ },
],
user=user,
)
@@ -365,6 +395,13 @@ class TestFastApiSecurity:
"action": "delete",
"entities": ["pool3"],
},
+ {
+ "action": "create",
+ "entities": [
+ {"pool": "pool4", "slots": 1},
+ ],
+ "action_on_existence": "overwrite",
+ },
]
}
)
@@ -385,6 +422,14 @@ class TestFastApiSecurity:
"method": "DELETE",
"details": PoolDetails(name="pool3"),
},
+ {
+ "method": "POST",
+ "details": PoolDetails(name="pool4"),
+ },
+ {
+ "method": "PUT",
+ "details": PoolDetails(name="pool4"),
+ },
],
user=user,
)