This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9b48498f103 Deprecate BackfillDetails and use DagAcccessEntity.Run for
backfill p… (#61400)
9b48498f103 is described below
commit 9b48498f10386f9a0775f8c38b460c0b032fbbe2
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Wed Feb 4 16:11:03 2026 +0100
Deprecate BackfillDetails and use DagAcccessEntity.Run for backfill p…
(#61400)
---
.../docs/core-concepts/auth-manager/index.rst | 1 -
airflow-core/newsfragments/61400.significant.rst | 20 ++
.../api_fastapi/auth/managers/base_auth_manager.py | 17 --
.../auth/managers/models/resource_details.py | 7 +-
.../auth/managers/simple/simple_auth_manager.py | 16 +-
.../core_api/routes/public/backfills.py | 8 +-
.../api_fastapi/core_api/routes/ui/backfills.py | 3 +-
.../src/airflow/api_fastapi/core_api/security.py | 59 ++++--
.../managers/simple/test_simple_auth_manager.py | 3 -
.../auth/managers/test_base_auth_manager.py | 10 -
.../core_api/routes/public/test_backfills.py | 2 +-
.../unit/api_fastapi/core_api/test_security.py | 225 +++++++++++++++++++--
12 files changed, 285 insertions(+), 86 deletions(-)
diff --git a/airflow-core/docs/core-concepts/auth-manager/index.rst
b/airflow-core/docs/core-concepts/auth-manager/index.rst
index 71db3833030..2e787c9e7fb 100644
--- a/airflow-core/docs/core-concepts/auth-manager/index.rst
+++ b/airflow-core/docs/core-concepts/auth-manager/index.rst
@@ -136,7 +136,6 @@ These authorization methods are:
Also, ``is_authorized_dag`` is called for any entity related to Dags (e.g.
task instances, Dag runs, ...). This information is passed in ``access_entity``.
Example: ``auth_manager.is_authorized_dag(method="GET",
access_entity=DagAccessEntity.Run, details=DagDetails(id="dag-1"))`` asks
whether the user has permission to read the Dag runs of the Dag "dag-1".
-* ``is_authorized_backfill``: Return whether the user is authorized to access
Airflow backfills. Some details about the backfill can be provided (e.g. the
backfill ID).
* ``is_authorized_asset``: Return whether the user is authorized to access
Airflow assets. Some details about the asset can be provided (e.g. the asset
ID).
* ``is_authorized_asset_alias``: Return whether the user is authorized to
access Airflow asset aliases. Some details about the asset alias can be
provided (e.g. the asset alias ID).
* ``is_authorized_pool``: Return whether the user is authorized to access
Airflow pools. Some details about the pool can be provided (e.g. the pool name).
diff --git a/airflow-core/newsfragments/61400.significant.rst
b/airflow-core/newsfragments/61400.significant.rst
new file mode 100644
index 00000000000..7bdee4390a7
--- /dev/null
+++ b/airflow-core/newsfragments/61400.significant.rst
@@ -0,0 +1,20 @@
+AuthManager Backfill permissions are now handled by the
``requires_access_dag`` on the ``DagAccessEntity.Run``
+
+``is_authorized_backfill`` of the ``BaseAuthManager`` interface has been
removed. Core will no longer call this method and their
+provider counterpart implementation will be marked as deprecated.
+Permissions for backfill operations are now checked against the
``DagAccessEntity.Run`` permission using the existing
+``requires_access_dag`` decorator. In other words, if a user has permission to
run a DAG, they can perform backfill operations on it.
+
+Please update your security policies to ensure that users who need to perform
backfill operations have the appropriate ``DagAccessEntity.Run`` permissions.
(Users
+having the Backfill permissions without having the DagRun ones will no longer
be able to perform backfill operations without any update)
+
+* Types of change
+
+ * [ ] Dag changes
+ * [ ] Config changes
+ * [x] API changes
+ * [ ] CLI changes
+ * [x] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [ ] Code interface changes
diff --git
a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
index 8f9f1dd8e00..62cfbd72485 100644
--- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
+++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
@@ -29,7 +29,6 @@ from sqlalchemy import select
from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
from airflow.api_fastapi.auth.managers.models.resource_details import (
- BackfillDetails,
ConnectionDetails,
DagDetails,
PoolDetails,
@@ -232,22 +231,6 @@ class BaseAuthManager(Generic[T], LoggingMixin,
metaclass=ABCMeta):
:param details: optional details about the DAG
"""
- @abstractmethod
- def is_authorized_backfill(
- self,
- *,
- method: ResourceMethod,
- user: T,
- details: BackfillDetails | None = None,
- ) -> bool:
- """
- Return whether the user is authorized to perform a given action on a
backfill.
-
- :param method: the method to perform
- :param user: the user to performing the action
- :param details: optional details about the backfill
- """
-
@abstractmethod
def is_authorized_asset(
self,
diff --git
a/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
b/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
index 6a0041570a1..4ff380d2aed 100644
---
a/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
+++
b/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
@@ -48,7 +48,12 @@ class DagDetails:
@dataclass
class BackfillDetails:
- """Represents the details of a backfill."""
+ """
+ Represents the details of a backfill.
+
+ .. deprecated:: 3.1.8
+ Use DagAccessEntity.Run instead for a dag level access control.
+ """
id: NonNegativeInt | None = None
diff --git
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
index f9f9c395e3f..1bdbcc0a3a2 100644
---
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
+++
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
@@ -36,7 +36,7 @@ from termcolor import colored
from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX
from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
-from airflow.api_fastapi.auth.managers.models.resource_details import
BackfillDetails, TeamDetails
+from airflow.api_fastapi.auth.managers.models.resource_details import
TeamDetails
from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
from airflow.api_fastapi.common.types import MenuItem
from airflow.configuration import AIRFLOW_HOME, conf
@@ -194,20 +194,6 @@ class
SimpleAuthManager(BaseAuthManager[SimpleAuthManagerUser]):
user=user,
)
- def is_authorized_backfill(
- self,
- *,
- method: ResourceMethod,
- user: SimpleAuthManagerUser,
- details: BackfillDetails | None = None,
- ) -> bool:
- return self._is_authorized(
- method=method,
- allow_get_role=SimpleAuthManagerRole.VIEWER,
- allow_role=SimpleAuthManagerRole.OP,
- user=user,
- )
-
def is_authorized_asset(
self,
*,
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
index adbac360247..b106248f136 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -25,7 +25,6 @@ from sqlalchemy import select, update
from sqlalchemy.orm import joinedload
from airflow._shared.timezones import timezone
-from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
from airflow.api_fastapi.common.db.common import (
SessionDep,
paginated_select,
@@ -42,7 +41,7 @@ from airflow.api_fastapi.core_api.datamodels.backfills import
(
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
-from airflow.api_fastapi.core_api.security import GetUserDep,
requires_access_backfill, requires_access_dag
+from airflow.api_fastapi.core_api.security import GetUserDep,
requires_access_backfill
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import DagNotFound
from airflow.models import DagRun
@@ -121,7 +120,6 @@ def get_backfill(
dependencies=[
Depends(action_logging()),
Depends(requires_access_backfill(method="PUT")),
- Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.RUN)),
],
)
def pause_backfill(backfill_id: NonNegativeInt, session: SessionDep) ->
BackfillResponse:
@@ -149,7 +147,6 @@ def pause_backfill(backfill_id: NonNegativeInt, session:
SessionDep) -> Backfill
dependencies=[
Depends(action_logging()),
Depends(requires_access_backfill(method="PUT")),
- Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.RUN)),
],
)
def unpause_backfill(backfill_id: NonNegativeInt, session: SessionDep) ->
BackfillResponse:
@@ -175,7 +172,6 @@ def unpause_backfill(backfill_id: NonNegativeInt, session:
SessionDep) -> Backfi
),
dependencies=[
Depends(action_logging()),
- Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="PUT")),
],
)
@@ -222,7 +218,6 @@ def cancel_backfill(backfill_id: NonNegativeInt, session:
SessionDep) -> Backfil
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT]),
dependencies=[
Depends(action_logging()),
- Depends(requires_access_dag(method="POST",
access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="POST")),
],
)
@@ -270,7 +265,6 @@ def create_backfill(
path="/dry_run",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT]),
dependencies=[
- Depends(requires_access_dag(method="POST",
access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="POST")),
],
)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
index c4c7ffe79a7..32b2891b954 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
@@ -36,7 +36,7 @@ from airflow.api_fastapi.core_api.datamodels.backfills import
BackfillCollection
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
-from airflow.api_fastapi.core_api.security import requires_access_backfill,
requires_access_dag
+from airflow.api_fastapi.core_api.security import requires_access_backfill
from airflow.models.backfill import Backfill
backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills")
@@ -47,7 +47,6 @@ backfills_router = AirflowRouter(tags=["Backfill"],
prefix="/backfills")
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[
Depends(requires_access_backfill(method="GET")),
- Depends(requires_access_dag(method="GET")),
],
)
def list_backfills_ui(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index 875513c8b2e..25b0a15a3a5 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -16,16 +16,17 @@
# under the License.
from __future__ import annotations
-from collections.abc import Callable
+from collections.abc import Callable, Coroutine
+from json import JSONDecodeError
from pathlib import Path
-from typing import TYPE_CHECKING, Annotated, cast
+from typing import TYPE_CHECKING, Annotated, Any, cast
from urllib.parse import ParseResult, unquote, urljoin, urlparse
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer,
OAuth2PasswordBearer
from jwt import ExpiredSignatureError, InvalidTokenError
-from pydantic import NonNegativeInt
-from sqlalchemy import or_
+from sqlalchemy import or_, select
+from sqlalchemy.orm import Session
from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.base_auth_manager import (
@@ -42,7 +43,6 @@ from
airflow.api_fastapi.auth.managers.models.resource_details import (
AccessView,
AssetAliasDetails,
AssetDetails,
- BackfillDetails,
ConfigurationDetails,
ConnectionDetails,
DagAccessEntity,
@@ -50,6 +50,7 @@ from
airflow.api_fastapi.auth.managers.models.resource_details import (
PoolDetails,
VariableDetails,
)
+from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.core_api.base import OrmClause
from airflow.api_fastapi.core_api.datamodels.common import (
BulkAction,
@@ -64,6 +65,7 @@ from airflow.api_fastapi.core_api.datamodels.pools import
PoolBody
from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
from airflow.configuration import conf
from airflow.models import Connection, Pool, Variable
+from airflow.models.backfill import Backfill
from airflow.models.dag import DagModel, DagRun, DagTag
from airflow.models.dagwarning import DagWarning
from airflow.models.log import Log
@@ -146,14 +148,21 @@ GetUserDep = Annotated[BaseUser, Depends(get_user)]
def requires_access_dag(
- method: ResourceMethod, access_entity: DagAccessEntity | None = None
+ method: ResourceMethod,
+ access_entity: DagAccessEntity | None = None,
+ param_dag_id: str | None = None,
) -> Callable[[Request, BaseUser], None]:
def inner(
request: Request,
user: GetUserDep,
) -> None:
- dag_id = request.path_params.get("dag_id") or
request.query_params.get("dag_id")
- dag_id = dag_id if dag_id != "~" else None
+ # Required for the closure to capture the dag_id but still be able to
mutate it.
+ # Prevent from using a nonlocal statement causing test failures.
+ dag_id = param_dag_id
+ if dag_id is None:
+ dag_id = request.path_params.get("dag_id") or
request.query_params.get("dag_id")
+ dag_id = dag_id if dag_id != "~" else None
+
team_name = DagModel.get_team_name(dag_id) if dag_id else None
_requires_access(
@@ -263,17 +272,35 @@ ReadableTagsFilterDep = Annotated[
]
-def requires_access_backfill(method: ResourceMethod) -> Callable[[Request,
BaseUser], None]:
- def inner(
+def requires_access_backfill(
+ method: ResourceMethod,
+) -> Callable[[Request, BaseUser, Session], Coroutine[Any, Any, None]]:
+ """Wrap ``requires_access_dag`` and extract the dag_id from the
backfill_id."""
+
+ async def inner(
request: Request,
user: GetUserDep,
+ session: SessionDep,
) -> None:
- backfill_id: NonNegativeInt | None =
request.path_params.get("backfill_id")
-
- _requires_access(
- is_authorized_callback=lambda:
get_auth_manager().is_authorized_backfill(
- method=method, details=BackfillDetails(id=backfill_id),
user=user
- ),
+ dag_id = None
+
+ # Try to retrieve the dag_id from the backfill_id path param
+ backfill_id = request.path_params.get("backfill_id")
+ if backfill_id is not None and isinstance(backfill_id, int):
+ backfill = session.scalars(select(Backfill).where(Backfill.id ==
backfill_id)).one_or_none()
+ dag_id = backfill.dag_id if backfill else None
+
+ # Try to retrieve the dag_id from the request body (POST backfill)
+ if dag_id is None:
+ try:
+ dag_id = (await request.json()).get("dag_id")
+ except JSONDecodeError:
+ # Not a json body, ignore
+ pass
+
+ requires_access_dag(method, DagAccessEntity.RUN, dag_id)(
+ request,
+ user,
)
return inner
diff --git
a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
index 2c67f79a5b8..1356a9fe1e8 100644
---
a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
+++
b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
@@ -135,7 +135,6 @@ class TestSimpleAuthManager:
"is_authorized_dag",
"is_authorized_asset",
"is_authorized_asset_alias",
- "is_authorized_backfill",
"is_authorized_pool",
"is_authorized_variable",
],
@@ -191,7 +190,6 @@ class TestSimpleAuthManager:
"is_authorized_connection",
"is_authorized_asset",
"is_authorized_asset_alias",
- "is_authorized_backfill",
"is_authorized_pool",
"is_authorized_variable",
],
@@ -237,7 +235,6 @@ class TestSimpleAuthManager:
"is_authorized_dag",
"is_authorized_asset",
"is_authorized_asset_alias",
- "is_authorized_backfill",
"is_authorized_pool",
],
)
diff --git
a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
index e719ff4836b..71769ef49a6 100644
---
a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
+++
b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
@@ -25,7 +25,6 @@ from jwt import InvalidTokenError
from airflow.api_fastapi.auth.managers.base_auth_manager import
BaseAuthManager, T
from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
from airflow.api_fastapi.auth.managers.models.resource_details import (
- BackfillDetails,
ConnectionDetails,
DagDetails,
PoolDetails,
@@ -92,15 +91,6 @@ class
EmptyAuthManager(BaseAuthManager[BaseAuthManagerUserTest]):
) -> bool:
raise NotImplementedError()
- def is_authorized_backfill(
- self,
- *,
- method: ResourceMethod,
- details: BackfillDetails | None = None,
- user: BaseAuthManagerUserTest | None = None,
- ) -> bool:
- raise NotImplementedError()
-
def is_authorized_asset(
self,
*,
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
index ade10293ef6..c55e284de62 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
@@ -126,7 +126,7 @@ class TestListBackfills(TestBackfillEndpoint):
session.add(b)
session.commit()
- with assert_queries_count(2):
+ with assert_queries_count(3):
response = test_client.get(f"/backfills?dag_id={dag.dag_id}")
assert response.status_code == 200
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
index 5aaf509b66b..8773d882d93 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
@@ -27,6 +27,7 @@ from airflow.api_fastapi.auth.managers.base_auth_manager
import COOKIE_NAME_JWT_
from airflow.api_fastapi.auth.managers.models.resource_details import (
ConnectionDetails,
DagAccessEntity,
+ DagDetails,
PoolDetails,
VariableDetails,
)
@@ -38,6 +39,7 @@ from airflow.api_fastapi.core_api.datamodels.variables import
VariableBody
from airflow.api_fastapi.core_api.security import (
get_user,
is_safe_url,
+ requires_access_backfill,
requires_access_connection,
requires_access_connection_bulk,
requires_access_dag,
@@ -48,6 +50,7 @@ from airflow.api_fastapi.core_api.security import (
resolve_user_from_token,
)
from airflow.models import Connection, Pool, Variable
+from airflow.models.dag import DagModel
from tests_common.test_utils.config import conf_vars
@@ -147,37 +150,233 @@ class TestFastApiSecurity:
mock_resolve_user_from_token.assert_called_once_with(expected)
@pytest.mark.db_test
+ @pytest.mark.parametrize(
+ ("param_dag_id", "path_params", "query_params", "expected_dag_id"),
+ [
+ # param_dag_id takes precedence when provided
+ ("dag_id_from_param", {}, {}, "dag_id_from_param"),
+ (
+ "dag_id_from_param",
+ {"dag_id": "dag_id_from_path_params"},
+ {"dag_id": "dag_id_from_query_params"},
+ "dag_id_from_param",
+ ),
+ # path_params used when param_dag_id is None
+ (None, {"dag_id": "dag_id_from_path_params"}, {},
"dag_id_from_path_params"),
+ (
+ None,
+ {"dag_id": "dag_id_from_path_params"},
+ {"dag_id": "dag_id_from_query_params"},
+ "dag_id_from_path_params",
+ ),
+ # query_params used when param_dag_id and path_params have no
dag_id
+ (None, {}, {"dag_id": "dag_id_from_query_params"},
"dag_id_from_query_params"),
+ # "~" is treated as None
+ (None, {"dag_id": "~"}, {}, None),
+ (None, {}, {"dag_id": "~"}, None),
+ # No source → dag_id None
+ (None, {}, {}, None),
+ ],
+ )
+ @patch.object(DagModel, "get_team_name")
@patch("airflow.api_fastapi.core_api.security.get_auth_manager")
- def test_requires_access_dag_authorized(self, mock_get_auth_manager):
+ def test_requires_access_dag_authorized(
+ self,
+ mock_get_auth_manager,
+ mock_get_team_name,
+ param_dag_id,
+ path_params,
+ query_params,
+ expected_dag_id,
+ ):
auth_manager = Mock()
auth_manager.is_authorized_dag.return_value = True
mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = "team1" if expected_dag_id else None
+
fastapi_request = Mock()
- fastapi_request.path_params = {}
- fastapi_request.query_params = {}
+ fastapi_request.path_params = path_params
+ fastapi_request.query_params = query_params
+ user = Mock()
- requires_access_dag("GET", DagAccessEntity.CODE)(fastapi_request,
Mock())
+ requires_access_dag("GET", DagAccessEntity.CODE,
param_dag_id=param_dag_id)(fastapi_request, user)
- auth_manager.is_authorized_dag.assert_called_once()
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="GET",
+ access_entity=DagAccessEntity.CODE,
+ details=DagDetails(id=expected_dag_id,
team_name=mock_get_team_name.return_value),
+ user=user,
+ )
@pytest.mark.db_test
+ @pytest.mark.parametrize(
+ ("param_dag_id", "path_params", "query_params", "expected_dag_id"),
+ [
+ ("dag_id_from_param", {}, {}, "dag_id_from_param"),
+ (None, {"dag_id": "dag_id_from_path_params"}, {},
"dag_id_from_path_params"),
+ (None, {}, {"dag_id": "dag_id_from_query_params"},
"dag_id_from_query_params"),
+ (None, {}, {}, None),
+ ],
+ )
+ @patch.object(DagModel, "get_team_name")
@patch("airflow.api_fastapi.core_api.security.get_auth_manager")
- def test_requires_access_dag_unauthorized(self, mock_get_auth_manager):
+ def test_requires_access_dag_unauthorized(
+ self,
+ mock_get_auth_manager,
+ mock_get_team_name,
+ param_dag_id,
+ path_params,
+ query_params,
+ expected_dag_id,
+ ):
auth_manager = Mock()
auth_manager.is_authorized_dag.return_value = False
mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = "team1" if expected_dag_id else None
+
fastapi_request = Mock()
- fastapi_request.path_params = {}
- fastapi_request.query_params = {}
+ fastapi_request.path_params = path_params
+ fastapi_request.query_params = query_params
+ user = Mock()
- mock_request = Mock()
- mock_request.path_params.return_value = {}
- mock_request.query_params.return_value = {}
+ with pytest.raises(HTTPException, match="Forbidden"):
+ requires_access_dag("GET", DagAccessEntity.CODE,
param_dag_id=param_dag_id)(fastapi_request, user)
+
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="GET",
+ access_entity=DagAccessEntity.CODE,
+ details=DagDetails(id=expected_dag_id,
team_name=mock_get_team_name.return_value),
+ user=user,
+ )
+
+ @pytest.mark.db_test
+ @pytest.mark.asyncio
+ @patch.object(DagModel, "get_team_name")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ async def test_requires_access_backfill_authorized_from_path(
+ self, mock_get_auth_manager, mock_get_team_name
+ ):
+ """When backfill_id is in path and Backfill exists, dag_id from
backfill is used."""
+ auth_manager = Mock()
+ auth_manager.is_authorized_dag.return_value = True
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = "team1"
+
+ backfill = Mock()
+ backfill.dag_id = "backfill_dag_id"
+ session = Mock()
+ session.scalars.return_value.one_or_none.return_value = backfill
+
+ request = Mock()
+ request.path_params = {"backfill_id": 42}
+ request.json = AsyncMock(return_value={})
+
+ user = Mock()
+
+ inner = requires_access_backfill("PUT")
+ await inner(request, user, session)
+
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="PUT",
+ access_entity=DagAccessEntity.RUN,
+ details=DagDetails(id="backfill_dag_id", team_name="team1"),
+ user=user,
+ )
+
+ @pytest.mark.db_test
+ @pytest.mark.asyncio
+ @patch.object(DagModel, "get_team_name")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ async def test_requires_access_backfill_authorized_from_body(
+ self, mock_get_auth_manager, mock_get_team_name
+ ):
+ """When backfill_id is missing or not int, dag_id can come from
request body (POST backfill)."""
+ auth_manager = Mock()
+ auth_manager.is_authorized_dag.return_value = True
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = "team1"
+
+ session = Mock()
+ session.scalars.return_value.one_or_none.return_value = None
+
+ request = Mock()
+ request.path_params = {}
+ request.json = AsyncMock(return_value={"dag_id": "body_dag_id"})
+
+ user = Mock()
+
+ inner = requires_access_backfill("POST")
+ await inner(request, user, session)
+
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="POST",
+ access_entity=DagAccessEntity.RUN,
+ details=DagDetails(id="body_dag_id", team_name="team1"),
+ user=user,
+ )
+
+ @pytest.mark.db_test
+ @pytest.mark.asyncio
+ @patch.object(DagModel, "get_team_name")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ async def test_requires_access_backfill_unauthorized(self,
mock_get_auth_manager, mock_get_team_name):
+ """When is_authorized_dag returns False, Forbidden is raised."""
+ auth_manager = Mock()
+ auth_manager.is_authorized_dag.return_value = False
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = None
+
+ backfill = Mock()
+ backfill.dag_id = "unauthorized_dag"
+ session = Mock()
+ session.scalars.return_value.one_or_none.return_value = backfill
+
+ request = Mock()
+ request.path_params = {"backfill_id": 1}
+ user = Mock()
+ inner = requires_access_backfill("GET")
with pytest.raises(HTTPException, match="Forbidden"):
- requires_access_dag("GET", DagAccessEntity.CODE)(fastapi_request,
Mock())
+ await inner(request, user, session)
+
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="GET",
+ access_entity=DagAccessEntity.RUN,
+ details=DagDetails(id="unauthorized_dag", team_name=None),
+ user=user,
+ )
- auth_manager.is_authorized_dag.assert_called_once()
+ @pytest.mark.db_test
+ @pytest.mark.asyncio
+ @patch.object(DagModel, "get_team_name")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ async def
test_requires_access_backfill_backfill_not_found_falls_back_to_body(
+ self, mock_get_auth_manager, mock_get_team_name
+ ):
+ """When backfill_id is int but Backfill not found, dag_id from body is
used."""
+ auth_manager = Mock()
+ auth_manager.is_authorized_dag.return_value = True
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = "team1"
+
+ session = Mock()
+ session.scalars.return_value.one_or_none.return_value = None
+
+ request = Mock()
+ request.path_params = {"backfill_id": 999}
+ request.json = AsyncMock(return_value={"dag_id": "fallback_dag_id"})
+
+ user = Mock()
+
+ inner = requires_access_backfill("POST")
+ await inner(request, user, session)
+
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="POST",
+ access_entity=DagAccessEntity.RUN,
+ details=DagDetails(id="fallback_dag_id", team_name="team1"),
+ user=user,
+ )
@pytest.mark.parametrize(
("url", "expected_is_safe"),