This is an automated email from the ASF dual-hosted git repository.
jasonliu pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new a3f17030213 [v3-1-test] Deprecate BackfillDetails and use
DagAcccessEntity.Run for backfill p… (#61456)
a3f17030213 is described below
commit a3f170302139f22951bd8ab0b455ce1094e9e829
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu Feb 5 05:22:16 2026 +0100
[v3-1-test] Deprecate BackfillDetails and use DagAcccessEntity.Run for
backfill p… (#61456)
* Deprecate BackfillDetails and use DagAcccessEntity.Run for backfill p…
(#61400)
(cherry picked from commit 9b48498f10386f9a0775f8c38b460c0b032fbbe2)
* Fix CI
---
.../docs/core-concepts/auth-manager/index.rst | 1 -
airflow-core/newsfragments/61400.significant.rst | 20 ++
.../api_fastapi/auth/managers/base_auth_manager.py | 18 +-
.../auth/managers/models/resource_details.py | 7 +-
.../auth/managers/simple/simple_auth_manager.py | 15 --
.../core_api/routes/public/backfills.py | 8 +-
.../api_fastapi/core_api/routes/ui/backfills.py | 3 +-
.../src/airflow/api_fastapi/core_api/security.py | 59 ++++--
.../managers/simple/test_simple_auth_manager.py | 3 -
.../auth/managers/test_base_auth_manager.py | 10 -
.../core_api/routes/public/test_backfills.py | 2 +-
.../unit/api_fastapi/core_api/test_security.py | 225 +++++++++++++++++++--
12 files changed, 285 insertions(+), 86 deletions(-)
diff --git a/airflow-core/docs/core-concepts/auth-manager/index.rst
b/airflow-core/docs/core-concepts/auth-manager/index.rst
index be626edde7f..b2bcae33268 100644
--- a/airflow-core/docs/core-concepts/auth-manager/index.rst
+++ b/airflow-core/docs/core-concepts/auth-manager/index.rst
@@ -136,7 +136,6 @@ These authorization methods are:
Also, ``is_authorized_dag`` is called for any entity related to Dags (e.g.
task instances, Dag runs, ...). This information is passed in ``access_entity``.
Example: ``auth_manager.is_authorized_dag(method="GET",
access_entity=DagAccessEntity.Run, details=DagDetails(id="dag-1"))`` asks
whether the user has permission to read the Dag runs of the Dag "dag-1".
-* ``is_authorized_backfill``: Return whether the user is authorized to access
Airflow backfills. Some details about the backfill can be provided (e.g. the
backfill ID).
* ``is_authorized_asset``: Return whether the user is authorized to access
Airflow assets. Some details about the asset can be provided (e.g. the asset
ID).
* ``is_authorized_asset_alias``: Return whether the user is authorized to
access Airflow asset aliases. Some details about the asset alias can be
provided (e.g. the asset alias ID).
* ``is_authorized_pool``: Return whether the user is authorized to access
Airflow pools. Some details about the pool can be provided (e.g. the pool name).
diff --git a/airflow-core/newsfragments/61400.significant.rst
b/airflow-core/newsfragments/61400.significant.rst
new file mode 100644
index 00000000000..7bdee4390a7
--- /dev/null
+++ b/airflow-core/newsfragments/61400.significant.rst
@@ -0,0 +1,20 @@
+AuthManager Backfill permissions are now handled by the
``requires_access_dag`` on the ``DagAccessEntity.Run``
+
+``is_authorized_backfill`` of the ``BaseAuthManager`` interface has been
removed. Core will no longer call this method and their
+provider counterpart implementation will be marked as deprecated.
+Permissions for backfill operations are now checked against the
``DagAccessEntity.Run`` permission using the existing
+``requires_access_dag`` decorator. In other words, if a user has permission to
run a DAG, they can perform backfill operations on it.
+
+Please update your security policies to ensure that users who need to perform
backfill operations have the appropriate ``DagAccessEntity.Run`` permissions.
(Users
+having the Backfill permissions without having the DagRun ones will no longer
be able to perform backfill operations without any update)
+
+* Types of change
+
+ * [ ] Dag changes
+ * [ ] Config changes
+ * [x] API changes
+ * [ ] CLI changes
+ * [x] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [ ] Code interface changes
diff --git
a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
index d7d2acabe59..b625b6fc78f 100644
--- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
+++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
@@ -26,7 +26,7 @@ from jwt import InvalidTokenError
from sqlalchemy import select
from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
-from airflow.api_fastapi.auth.managers.models.resource_details import
BackfillDetails, DagDetails
+from airflow.api_fastapi.auth.managers.models.resource_details import
DagDetails
from airflow.api_fastapi.auth.tokens import (
JWTGenerator,
JWTValidator,
@@ -198,22 +198,6 @@ class BaseAuthManager(Generic[T], LoggingMixin,
metaclass=ABCMeta):
:param details: optional details about the DAG
"""
- @abstractmethod
- def is_authorized_backfill(
- self,
- *,
- method: ResourceMethod,
- user: T,
- details: BackfillDetails | None = None,
- ) -> bool:
- """
- Return whether the user is authorized to perform a given action on a
backfill.
-
- :param method: the method to perform
- :param user: the user to performing the action
- :param details: optional details about the backfill
- """
-
@abstractmethod
def is_authorized_asset(
self,
diff --git
a/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
b/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
index afce15e6b0a..22c46371f31 100644
---
a/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
+++
b/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
@@ -48,7 +48,12 @@ class DagDetails:
@dataclass
class BackfillDetails:
- """Represents the details of a backfill."""
+ """
+ Represents the details of a backfill.
+
+ .. deprecated:: 3.1.8
+ Use DagAccessEntity.Run instead for a dag level access control.
+ """
id: NonNegativeInt | None = None
diff --git
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
index 09250168e14..52f5144eef7 100644
---
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
+++
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
@@ -37,7 +37,6 @@ from termcolor import colored
from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX
from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
-from airflow.api_fastapi.auth.managers.models.resource_details import
BackfillDetails
from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
from airflow.api_fastapi.common.types import MenuItem
from airflow.configuration import AIRFLOW_HOME, conf
@@ -194,20 +193,6 @@ class
SimpleAuthManager(BaseAuthManager[SimpleAuthManagerUser]):
user=user,
)
- def is_authorized_backfill(
- self,
- *,
- method: ResourceMethod,
- user: SimpleAuthManagerUser,
- details: BackfillDetails | None = None,
- ) -> bool:
- return self._is_authorized(
- method=method,
- allow_get_role=SimpleAuthManagerRole.VIEWER,
- allow_role=SimpleAuthManagerRole.OP,
- user=user,
- )
-
def is_authorized_asset(
self,
*,
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
index b9c1416b2ce..7c63393c2f9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -25,7 +25,6 @@ from sqlalchemy import select, update
from sqlalchemy.orm import joinedload
from airflow._shared.timezones import timezone
-from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
from airflow.api_fastapi.common.db.common import (
SessionDep,
paginated_select,
@@ -42,7 +41,7 @@ from airflow.api_fastapi.core_api.datamodels.backfills import
(
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
-from airflow.api_fastapi.core_api.security import GetUserDep,
requires_access_backfill, requires_access_dag
+from airflow.api_fastapi.core_api.security import GetUserDep,
requires_access_backfill
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import DagNotFound
from airflow.models import DagRun
@@ -122,7 +121,6 @@ def get_backfill(
dependencies=[
Depends(action_logging()),
Depends(requires_access_backfill(method="PUT")),
- Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.RUN)),
],
)
def pause_backfill(backfill_id: NonNegativeInt, session: SessionDep) ->
BackfillResponse:
@@ -150,7 +148,6 @@ def pause_backfill(backfill_id: NonNegativeInt, session:
SessionDep) -> Backfill
dependencies=[
Depends(action_logging()),
Depends(requires_access_backfill(method="PUT")),
- Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.RUN)),
],
)
def unpause_backfill(backfill_id: NonNegativeInt, session: SessionDep) ->
BackfillResponse:
@@ -176,7 +173,6 @@ def unpause_backfill(backfill_id: NonNegativeInt, session:
SessionDep) -> Backfi
),
dependencies=[
Depends(action_logging()),
- Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="PUT")),
],
)
@@ -223,7 +219,6 @@ def cancel_backfill(backfill_id: NonNegativeInt, session:
SessionDep) -> Backfil
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT]),
dependencies=[
Depends(action_logging()),
- Depends(requires_access_dag(method="POST",
access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="POST")),
],
)
@@ -271,7 +266,6 @@ def create_backfill(
path="/dry_run",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT]),
dependencies=[
- Depends(requires_access_dag(method="POST",
access_entity=DagAccessEntity.RUN)),
Depends(requires_access_backfill(method="POST")),
],
)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
index 24bf0f66fc3..9d1630689f7 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
@@ -36,7 +36,7 @@ from airflow.api_fastapi.core_api.datamodels.backfills import
BackfillCollection
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
-from airflow.api_fastapi.core_api.security import requires_access_backfill,
requires_access_dag
+from airflow.api_fastapi.core_api.security import requires_access_backfill
from airflow.models.backfill import Backfill
backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills")
@@ -47,7 +47,6 @@ backfills_router = AirflowRouter(tags=["Backfill"],
prefix="/backfills")
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
dependencies=[
Depends(requires_access_backfill(method="GET")),
- Depends(requires_access_dag(method="GET")),
],
)
def list_backfills_ui(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index ed75cf305ae..9d31ed4bd56 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -16,16 +16,17 @@
# under the License.
from __future__ import annotations
-from collections.abc import Callable
+from collections.abc import Callable, Coroutine
+from json import JSONDecodeError
from pathlib import Path
-from typing import TYPE_CHECKING, Annotated, cast
+from typing import TYPE_CHECKING, Annotated, Any, cast
from urllib.parse import ParseResult, unquote, urljoin, urlparse
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer,
OAuth2PasswordBearer
from jwt import ExpiredSignatureError, InvalidTokenError
-from pydantic import NonNegativeInt
-from sqlalchemy import or_
+from sqlalchemy import or_, select
+from sqlalchemy.orm import Session
from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.base_auth_manager import
COOKIE_NAME_JWT_TOKEN
@@ -39,7 +40,6 @@ from
airflow.api_fastapi.auth.managers.models.resource_details import (
AccessView,
AssetAliasDetails,
AssetDetails,
- BackfillDetails,
ConfigurationDetails,
ConnectionDetails,
DagAccessEntity,
@@ -47,6 +47,7 @@ from
airflow.api_fastapi.auth.managers.models.resource_details import (
PoolDetails,
VariableDetails,
)
+from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.core_api.base import OrmClause
from airflow.api_fastapi.core_api.datamodels.common import (
BulkAction,
@@ -61,6 +62,7 @@ from airflow.api_fastapi.core_api.datamodels.pools import
PoolBody
from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
from airflow.configuration import conf
from airflow.models import Connection, Pool, Variable
+from airflow.models.backfill import Backfill
from airflow.models.dag import DagModel, DagRun, DagTag
from airflow.models.dagwarning import DagWarning
from airflow.models.log import Log
@@ -129,14 +131,21 @@ GetUserDep = Annotated[BaseUser, Depends(get_user)]
def requires_access_dag(
- method: ResourceMethod, access_entity: DagAccessEntity | None = None
+ method: ResourceMethod,
+ access_entity: DagAccessEntity | None = None,
+ param_dag_id: str | None = None,
) -> Callable[[Request, BaseUser], None]:
def inner(
request: Request,
user: GetUserDep,
) -> None:
- dag_id = request.path_params.get("dag_id") or
request.query_params.get("dag_id")
- dag_id = dag_id if dag_id != "~" else None
+ # Required for the closure to capture the dag_id but still be able to
mutate it.
+ # Prevent from using a nonlocal statement causing test failures.
+ dag_id = param_dag_id
+ if dag_id is None:
+ dag_id = request.path_params.get("dag_id") or
request.query_params.get("dag_id")
+ dag_id = dag_id if dag_id != "~" else None
+
team_name = DagModel.get_team_name(dag_id) if dag_id else None
_requires_access(
@@ -246,17 +255,35 @@ ReadableTagsFilterDep = Annotated[
]
-def requires_access_backfill(method: ResourceMethod) -> Callable[[Request,
BaseUser], None]:
- def inner(
+def requires_access_backfill(
+ method: ResourceMethod,
+) -> Callable[[Request, BaseUser, Session], Coroutine[Any, Any, None]]:
+ """Wrap ``requires_access_dag`` and extract the dag_id from the
backfill_id."""
+
+ async def inner(
request: Request,
user: GetUserDep,
+ session: SessionDep,
) -> None:
- backfill_id: NonNegativeInt | None =
request.path_params.get("backfill_id")
-
- _requires_access(
- is_authorized_callback=lambda:
get_auth_manager().is_authorized_backfill(
- method=method, details=BackfillDetails(id=backfill_id),
user=user
- ),
+ dag_id = None
+
+ # Try to retrieve the dag_id from the backfill_id path param
+ backfill_id = request.path_params.get("backfill_id")
+ if backfill_id is not None and isinstance(backfill_id, int):
+ backfill = session.scalars(select(Backfill).where(Backfill.id ==
backfill_id)).one_or_none()
+ dag_id = backfill.dag_id if backfill else None
+
+ # Try to retrieve the dag_id from the request body (POST backfill)
+ if dag_id is None:
+ try:
+ dag_id = (await request.json()).get("dag_id")
+ except JSONDecodeError:
+ # Not a json body, ignore
+ pass
+
+ requires_access_dag(method, DagAccessEntity.RUN, dag_id)(
+ request,
+ user,
)
return inner
diff --git
a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
index de58efca7e1..75a19ea81be 100644
---
a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
+++
b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
@@ -138,7 +138,6 @@ class TestSimpleAuthManager:
"is_authorized_dag",
"is_authorized_asset",
"is_authorized_asset_alias",
- "is_authorized_backfill",
"is_authorized_pool",
"is_authorized_variable",
],
@@ -194,7 +193,6 @@ class TestSimpleAuthManager:
"is_authorized_connection",
"is_authorized_asset",
"is_authorized_asset_alias",
- "is_authorized_backfill",
"is_authorized_pool",
"is_authorized_variable",
],
@@ -240,7 +238,6 @@ class TestSimpleAuthManager:
"is_authorized_dag",
"is_authorized_asset",
"is_authorized_asset_alias",
- "is_authorized_backfill",
"is_authorized_pool",
],
)
diff --git
a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
index 98217cef115..9cf86812c42 100644
---
a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
+++
b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
@@ -25,7 +25,6 @@ from jwt import InvalidTokenError
from airflow.api_fastapi.auth.managers.base_auth_manager import
BaseAuthManager, T
from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
from airflow.api_fastapi.auth.managers.models.resource_details import (
- BackfillDetails,
ConnectionDetails,
DagDetails,
PoolDetails,
@@ -91,15 +90,6 @@ class
EmptyAuthManager(BaseAuthManager[BaseAuthManagerUserTest]):
) -> bool:
raise NotImplementedError()
- def is_authorized_backfill(
- self,
- *,
- method: ResourceMethod,
- details: BackfillDetails | None = None,
- user: BaseAuthManagerUserTest | None = None,
- ) -> bool:
- raise NotImplementedError()
-
def is_authorized_asset(
self,
*,
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
index 6ce4a6ac85a..3b8a4fada0d 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
@@ -125,7 +125,7 @@ class TestListBackfills(TestBackfillEndpoint):
session.add(b)
session.commit()
- with assert_queries_count(2):
+ with assert_queries_count(3):
response = test_client.get(f"/backfills?dag_id={dag.dag_id}")
assert response.status_code == 200
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
index f86a511aed0..ccd9f53ea2f 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
@@ -27,6 +27,7 @@ from airflow.api_fastapi.auth.managers.base_auth_manager
import COOKIE_NAME_JWT_
from airflow.api_fastapi.auth.managers.models.resource_details import (
ConnectionDetails,
DagAccessEntity,
+ DagDetails,
PoolDetails,
VariableDetails,
)
@@ -38,6 +39,7 @@ from airflow.api_fastapi.core_api.datamodels.variables import
VariableBody
from airflow.api_fastapi.core_api.security import (
get_user,
is_safe_url,
+ requires_access_backfill,
requires_access_connection,
requires_access_connection_bulk,
requires_access_dag,
@@ -48,6 +50,7 @@ from airflow.api_fastapi.core_api.security import (
resolve_user_from_token,
)
from airflow.models import Connection, Pool, Variable
+from airflow.models.dag import DagModel
from tests_common.test_utils.config import conf_vars
@@ -147,37 +150,233 @@ class TestFastApiSecurity:
mock_resolve_user_from_token.assert_called_once_with(expected)
@pytest.mark.db_test
+ @pytest.mark.parametrize(
+ ("param_dag_id", "path_params", "query_params", "expected_dag_id"),
+ [
+ # param_dag_id takes precedence when provided
+ ("dag_id_from_param", {}, {}, "dag_id_from_param"),
+ (
+ "dag_id_from_param",
+ {"dag_id": "dag_id_from_path_params"},
+ {"dag_id": "dag_id_from_query_params"},
+ "dag_id_from_param",
+ ),
+ # path_params used when param_dag_id is None
+ (None, {"dag_id": "dag_id_from_path_params"}, {},
"dag_id_from_path_params"),
+ (
+ None,
+ {"dag_id": "dag_id_from_path_params"},
+ {"dag_id": "dag_id_from_query_params"},
+ "dag_id_from_path_params",
+ ),
+ # query_params used when param_dag_id and path_params have no
dag_id
+ (None, {}, {"dag_id": "dag_id_from_query_params"},
"dag_id_from_query_params"),
+ # "~" is treated as None
+ (None, {"dag_id": "~"}, {}, None),
+ (None, {}, {"dag_id": "~"}, None),
+ # No source → dag_id None
+ (None, {}, {}, None),
+ ],
+ )
+ @patch.object(DagModel, "get_team_name")
@patch("airflow.api_fastapi.core_api.security.get_auth_manager")
- async def test_requires_access_dag_authorized(self, mock_get_auth_manager):
+ def test_requires_access_dag_authorized(
+ self,
+ mock_get_auth_manager,
+ mock_get_team_name,
+ param_dag_id,
+ path_params,
+ query_params,
+ expected_dag_id,
+ ):
auth_manager = Mock()
auth_manager.is_authorized_dag.return_value = True
mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = "team1" if expected_dag_id else None
+
fastapi_request = Mock()
- fastapi_request.path_params = {}
- fastapi_request.query_params = {}
+ fastapi_request.path_params = path_params
+ fastapi_request.query_params = query_params
+ user = Mock()
- requires_access_dag("GET", DagAccessEntity.CODE)(fastapi_request,
Mock())
+ requires_access_dag("GET", DagAccessEntity.CODE,
param_dag_id=param_dag_id)(fastapi_request, user)
- auth_manager.is_authorized_dag.assert_called_once()
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="GET",
+ access_entity=DagAccessEntity.CODE,
+ details=DagDetails(id=expected_dag_id,
team_name=mock_get_team_name.return_value),
+ user=user,
+ )
@pytest.mark.db_test
+ @pytest.mark.parametrize(
+ ("param_dag_id", "path_params", "query_params", "expected_dag_id"),
+ [
+ ("dag_id_from_param", {}, {}, "dag_id_from_param"),
+ (None, {"dag_id": "dag_id_from_path_params"}, {},
"dag_id_from_path_params"),
+ (None, {}, {"dag_id": "dag_id_from_query_params"},
"dag_id_from_query_params"),
+ (None, {}, {}, None),
+ ],
+ )
+ @patch.object(DagModel, "get_team_name")
@patch("airflow.api_fastapi.core_api.security.get_auth_manager")
- async def test_requires_access_dag_unauthorized(self,
mock_get_auth_manager):
+ def test_requires_access_dag_unauthorized(
+ self,
+ mock_get_auth_manager,
+ mock_get_team_name,
+ param_dag_id,
+ path_params,
+ query_params,
+ expected_dag_id,
+ ):
auth_manager = Mock()
auth_manager.is_authorized_dag.return_value = False
mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = "team1" if expected_dag_id else None
+
fastapi_request = Mock()
- fastapi_request.path_params = {}
- fastapi_request.query_params = {}
+ fastapi_request.path_params = path_params
+ fastapi_request.query_params = query_params
+ user = Mock()
+
+ with pytest.raises(HTTPException, match="Forbidden"):
+ requires_access_dag("GET", DagAccessEntity.CODE,
param_dag_id=param_dag_id)(fastapi_request, user)
+
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="GET",
+ access_entity=DagAccessEntity.CODE,
+ details=DagDetails(id=expected_dag_id,
team_name=mock_get_team_name.return_value),
+ user=user,
+ )
- mock_request = Mock()
- mock_request.path_params.return_value = {}
- mock_request.query_params.return_value = {}
+ @pytest.mark.db_test
+ @pytest.mark.asyncio
+ @patch.object(DagModel, "get_team_name")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ async def test_requires_access_backfill_authorized_from_path(
+ self, mock_get_auth_manager, mock_get_team_name
+ ):
+ """When backfill_id is in path and Backfill exists, dag_id from
backfill is used."""
+ auth_manager = Mock()
+ auth_manager.is_authorized_dag.return_value = True
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = "team1"
+
+ backfill = Mock()
+ backfill.dag_id = "backfill_dag_id"
+ session = Mock()
+ session.scalars.return_value.one_or_none.return_value = backfill
+
+ request = Mock()
+ request.path_params = {"backfill_id": 42}
+ request.json = AsyncMock(return_value={})
+ user = Mock()
+
+ inner = requires_access_backfill("PUT")
+ await inner(request, user, session)
+
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="PUT",
+ access_entity=DagAccessEntity.RUN,
+ details=DagDetails(id="backfill_dag_id", team_name="team1"),
+ user=user,
+ )
+
+ @pytest.mark.db_test
+ @pytest.mark.asyncio
+ @patch.object(DagModel, "get_team_name")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ async def test_requires_access_backfill_authorized_from_body(
+ self, mock_get_auth_manager, mock_get_team_name
+ ):
+ """When backfill_id is missing or not int, dag_id can come from
request body (POST backfill)."""
+ auth_manager = Mock()
+ auth_manager.is_authorized_dag.return_value = True
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = "team1"
+
+ session = Mock()
+ session.scalars.return_value.one_or_none.return_value = None
+
+ request = Mock()
+ request.path_params = {}
+ request.json = AsyncMock(return_value={"dag_id": "body_dag_id"})
+
+ user = Mock()
+
+ inner = requires_access_backfill("POST")
+ await inner(request, user, session)
+
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="POST",
+ access_entity=DagAccessEntity.RUN,
+ details=DagDetails(id="body_dag_id", team_name="team1"),
+ user=user,
+ )
+
+ @pytest.mark.db_test
+ @pytest.mark.asyncio
+ @patch.object(DagModel, "get_team_name")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ async def test_requires_access_backfill_unauthorized(self,
mock_get_auth_manager, mock_get_team_name):
+ """When is_authorized_dag returns False, Forbidden is raised."""
+ auth_manager = Mock()
+ auth_manager.is_authorized_dag.return_value = False
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = None
+
+ backfill = Mock()
+ backfill.dag_id = "unauthorized_dag"
+ session = Mock()
+ session.scalars.return_value.one_or_none.return_value = backfill
+
+ request = Mock()
+ request.path_params = {"backfill_id": 1}
+ user = Mock()
+
+ inner = requires_access_backfill("GET")
with pytest.raises(HTTPException, match="Forbidden"):
- requires_access_dag("GET", DagAccessEntity.CODE)(fastapi_request,
Mock())
+ await inner(request, user, session)
+
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="GET",
+ access_entity=DagAccessEntity.RUN,
+ details=DagDetails(id="unauthorized_dag", team_name=None),
+ user=user,
+ )
+
+ @pytest.mark.db_test
+ @pytest.mark.asyncio
+ @patch.object(DagModel, "get_team_name")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ async def
test_requires_access_backfill_backfill_not_found_falls_back_to_body(
+ self, mock_get_auth_manager, mock_get_team_name
+ ):
+ """When backfill_id is int but Backfill not found, dag_id from body is
used."""
+ auth_manager = Mock()
+ auth_manager.is_authorized_dag.return_value = True
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_team_name.return_value = "team1"
+
+ session = Mock()
+ session.scalars.return_value.one_or_none.return_value = None
+
+ request = Mock()
+ request.path_params = {"backfill_id": 999}
+ request.json = AsyncMock(return_value={"dag_id": "fallback_dag_id"})
- auth_manager.is_authorized_dag.assert_called_once()
+ user = Mock()
+
+ inner = requires_access_backfill("POST")
+ await inner(request, user, session)
+
+ auth_manager.is_authorized_dag.assert_called_once_with(
+ method="POST",
+ access_entity=DagAccessEntity.RUN,
+ details=DagDetails(id="fallback_dag_id", team_name="team1"),
+ user=user,
+ )
@pytest.mark.parametrize(
"url, expected_is_safe",