This is an automated email from the ASF dual-hosted git repository.

jasonliu pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new a3f17030213 [v3-1-test] Deprecate BackfillDetails and use 
DagAcccessEntity.Run for backfill p… (#61456)
a3f17030213 is described below

commit a3f170302139f22951bd8ab0b455ce1094e9e829
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu Feb 5 05:22:16 2026 +0100

    [v3-1-test] Deprecate BackfillDetails and use DagAcccessEntity.Run for 
backfill p… (#61456)
    
    * Deprecate BackfillDetails and use DagAcccessEntity.Run for backfill p… 
(#61400)
    
    (cherry picked from commit 9b48498f10386f9a0775f8c38b460c0b032fbbe2)
    
    * Fix CI
---
 .../docs/core-concepts/auth-manager/index.rst      |   1 -
 airflow-core/newsfragments/61400.significant.rst   |  20 ++
 .../api_fastapi/auth/managers/base_auth_manager.py |  18 +-
 .../auth/managers/models/resource_details.py       |   7 +-
 .../auth/managers/simple/simple_auth_manager.py    |  15 --
 .../core_api/routes/public/backfills.py            |   8 +-
 .../api_fastapi/core_api/routes/ui/backfills.py    |   3 +-
 .../src/airflow/api_fastapi/core_api/security.py   |  59 ++++--
 .../managers/simple/test_simple_auth_manager.py    |   3 -
 .../auth/managers/test_base_auth_manager.py        |  10 -
 .../core_api/routes/public/test_backfills.py       |   2 +-
 .../unit/api_fastapi/core_api/test_security.py     | 225 +++++++++++++++++++--
 12 files changed, 285 insertions(+), 86 deletions(-)

diff --git a/airflow-core/docs/core-concepts/auth-manager/index.rst 
b/airflow-core/docs/core-concepts/auth-manager/index.rst
index be626edde7f..b2bcae33268 100644
--- a/airflow-core/docs/core-concepts/auth-manager/index.rst
+++ b/airflow-core/docs/core-concepts/auth-manager/index.rst
@@ -136,7 +136,6 @@ These authorization methods are:
   Also, ``is_authorized_dag`` is called for any entity related to Dags (e.g. 
task instances, Dag runs, ...). This information is passed in ``access_entity``.
   Example: ``auth_manager.is_authorized_dag(method="GET", 
access_entity=DagAccessEntity.Run, details=DagDetails(id="dag-1"))`` asks
   whether the user has permission to read the Dag runs of the Dag "dag-1".
-* ``is_authorized_backfill``: Return whether the user is authorized to access 
Airflow backfills. Some details about the backfill can be provided (e.g. the 
backfill ID).
 * ``is_authorized_asset``: Return whether the user is authorized to access 
Airflow assets. Some details about the asset can be provided (e.g. the asset 
ID).
 * ``is_authorized_asset_alias``: Return whether the user is authorized to 
access Airflow asset aliases. Some details about the asset alias can be 
provided (e.g. the asset alias ID).
 * ``is_authorized_pool``: Return whether the user is authorized to access 
Airflow pools. Some details about the pool can be provided (e.g. the pool name).
diff --git a/airflow-core/newsfragments/61400.significant.rst 
b/airflow-core/newsfragments/61400.significant.rst
new file mode 100644
index 00000000000..7bdee4390a7
--- /dev/null
+++ b/airflow-core/newsfragments/61400.significant.rst
@@ -0,0 +1,20 @@
+AuthManager Backfill permissions are now handled by the 
``requires_access_dag`` on the ``DagAccessEntity.Run``
+
+``is_authorized_backfill`` of the ``BaseAuthManager`` interface has been 
removed. Core will no longer call this method and their
+provider counterpart implementation will be marked as deprecated.
+Permissions for backfill operations are now checked against the 
``DagAccessEntity.Run`` permission using the existing
+``requires_access_dag`` decorator. In other words, if a user has permission to 
run a DAG, they can perform backfill operations on it.
+
+Please update your security policies to ensure that users who need to perform 
backfill operations have the appropriate ``DagAccessEntity.Run`` permissions. 
(Users
+having the Backfill permissions without having the DagRun ones will no longer 
be able to perform backfill operations without any update)
+
+* Types of change
+
+  * [ ] Dag changes
+  * [ ] Config changes
+  * [x] API changes
+  * [ ] CLI changes
+  * [x] Behaviour changes
+  * [ ] Plugin changes
+  * [ ] Dependency changes
+  * [ ] Code interface changes
diff --git 
a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py 
b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
index d7d2acabe59..b625b6fc78f 100644
--- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
+++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
@@ -26,7 +26,7 @@ from jwt import InvalidTokenError
 from sqlalchemy import select
 
 from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
-from airflow.api_fastapi.auth.managers.models.resource_details import 
BackfillDetails, DagDetails
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagDetails
 from airflow.api_fastapi.auth.tokens import (
     JWTGenerator,
     JWTValidator,
@@ -198,22 +198,6 @@ class BaseAuthManager(Generic[T], LoggingMixin, 
metaclass=ABCMeta):
         :param details: optional details about the DAG
         """
 
-    @abstractmethod
-    def is_authorized_backfill(
-        self,
-        *,
-        method: ResourceMethod,
-        user: T,
-        details: BackfillDetails | None = None,
-    ) -> bool:
-        """
-        Return whether the user is authorized to perform a given action on a 
backfill.
-
-        :param method: the method to perform
-        :param user: the user to performing the action
-        :param details: optional details about the backfill
-        """
-
     @abstractmethod
     def is_authorized_asset(
         self,
diff --git 
a/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py 
b/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
index afce15e6b0a..22c46371f31 100644
--- 
a/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
+++ 
b/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py
@@ -48,7 +48,12 @@ class DagDetails:
 
 @dataclass
 class BackfillDetails:
-    """Represents the details of a backfill."""
+    """
+    Represents the details of a backfill.
+
+    .. deprecated:: 3.1.8
+        Use DagAccessEntity.Run instead for a dag level access control.
+    """
 
     id: NonNegativeInt | None = None
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
 
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
index 09250168e14..52f5144eef7 100644
--- 
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
+++ 
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
@@ -37,7 +37,6 @@ from termcolor import colored
 
 from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX
 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
-from airflow.api_fastapi.auth.managers.models.resource_details import 
BackfillDetails
 from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
 from airflow.api_fastapi.common.types import MenuItem
 from airflow.configuration import AIRFLOW_HOME, conf
@@ -194,20 +193,6 @@ class 
SimpleAuthManager(BaseAuthManager[SimpleAuthManagerUser]):
             user=user,
         )
 
-    def is_authorized_backfill(
-        self,
-        *,
-        method: ResourceMethod,
-        user: SimpleAuthManagerUser,
-        details: BackfillDetails | None = None,
-    ) -> bool:
-        return self._is_authorized(
-            method=method,
-            allow_get_role=SimpleAuthManagerRole.VIEWER,
-            allow_role=SimpleAuthManagerRole.OP,
-            user=user,
-        )
-
     def is_authorized_asset(
         self,
         *,
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
index b9c1416b2ce..7c63393c2f9 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -25,7 +25,6 @@ from sqlalchemy import select, update
 from sqlalchemy.orm import joinedload
 
 from airflow._shared.timezones import timezone
-from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
 from airflow.api_fastapi.common.db.common import (
     SessionDep,
     paginated_select,
@@ -42,7 +41,7 @@ from airflow.api_fastapi.core_api.datamodels.backfills import 
(
 from airflow.api_fastapi.core_api.openapi.exceptions import (
     create_openapi_http_exception_doc,
 )
-from airflow.api_fastapi.core_api.security import GetUserDep, 
requires_access_backfill, requires_access_dag
+from airflow.api_fastapi.core_api.security import GetUserDep, 
requires_access_backfill
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.exceptions import DagNotFound
 from airflow.models import DagRun
@@ -122,7 +121,6 @@ def get_backfill(
     dependencies=[
         Depends(action_logging()),
         Depends(requires_access_backfill(method="PUT")),
-        Depends(requires_access_dag(method="PUT", 
access_entity=DagAccessEntity.RUN)),
     ],
 )
 def pause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> 
BackfillResponse:
@@ -150,7 +148,6 @@ def pause_backfill(backfill_id: NonNegativeInt, session: 
SessionDep) -> Backfill
     dependencies=[
         Depends(action_logging()),
         Depends(requires_access_backfill(method="PUT")),
-        Depends(requires_access_dag(method="PUT", 
access_entity=DagAccessEntity.RUN)),
     ],
 )
 def unpause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> 
BackfillResponse:
@@ -176,7 +173,6 @@ def unpause_backfill(backfill_id: NonNegativeInt, session: 
SessionDep) -> Backfi
     ),
     dependencies=[
         Depends(action_logging()),
-        Depends(requires_access_dag(method="PUT", 
access_entity=DagAccessEntity.RUN)),
         Depends(requires_access_backfill(method="PUT")),
     ],
 )
@@ -223,7 +219,6 @@ def cancel_backfill(backfill_id: NonNegativeInt, session: 
SessionDep) -> Backfil
     responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, 
status.HTTP_409_CONFLICT]),
     dependencies=[
         Depends(action_logging()),
-        Depends(requires_access_dag(method="POST", 
access_entity=DagAccessEntity.RUN)),
         Depends(requires_access_backfill(method="POST")),
     ],
 )
@@ -271,7 +266,6 @@ def create_backfill(
     path="/dry_run",
     responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, 
status.HTTP_409_CONFLICT]),
     dependencies=[
-        Depends(requires_access_dag(method="POST", 
access_entity=DagAccessEntity.RUN)),
         Depends(requires_access_backfill(method="POST")),
     ],
 )
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
index 24bf0f66fc3..9d1630689f7 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py
@@ -36,7 +36,7 @@ from airflow.api_fastapi.core_api.datamodels.backfills import 
BackfillCollection
 from airflow.api_fastapi.core_api.openapi.exceptions import (
     create_openapi_http_exception_doc,
 )
-from airflow.api_fastapi.core_api.security import requires_access_backfill, 
requires_access_dag
+from airflow.api_fastapi.core_api.security import requires_access_backfill
 from airflow.models.backfill import Backfill
 
 backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills")
@@ -47,7 +47,6 @@ backfills_router = AirflowRouter(tags=["Backfill"], 
prefix="/backfills")
     responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
     dependencies=[
         Depends(requires_access_backfill(method="GET")),
-        Depends(requires_access_dag(method="GET")),
     ],
 )
 def list_backfills_ui(
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py 
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index ed75cf305ae..9d31ed4bd56 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -16,16 +16,17 @@
 # under the License.
 from __future__ import annotations
 
-from collections.abc import Callable
+from collections.abc import Callable, Coroutine
+from json import JSONDecodeError
 from pathlib import Path
-from typing import TYPE_CHECKING, Annotated, cast
+from typing import TYPE_CHECKING, Annotated, Any, cast
 from urllib.parse import ParseResult, unquote, urljoin, urlparse
 
 from fastapi import Depends, HTTPException, Request, status
 from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer, 
OAuth2PasswordBearer
 from jwt import ExpiredSignatureError, InvalidTokenError
-from pydantic import NonNegativeInt
-from sqlalchemy import or_
+from sqlalchemy import or_, select
+from sqlalchemy.orm import Session
 
 from airflow.api_fastapi.app import get_auth_manager
 from airflow.api_fastapi.auth.managers.base_auth_manager import 
COOKIE_NAME_JWT_TOKEN
@@ -39,7 +40,6 @@ from 
airflow.api_fastapi.auth.managers.models.resource_details import (
     AccessView,
     AssetAliasDetails,
     AssetDetails,
-    BackfillDetails,
     ConfigurationDetails,
     ConnectionDetails,
     DagAccessEntity,
@@ -47,6 +47,7 @@ from 
airflow.api_fastapi.auth.managers.models.resource_details import (
     PoolDetails,
     VariableDetails,
 )
+from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.core_api.base import OrmClause
 from airflow.api_fastapi.core_api.datamodels.common import (
     BulkAction,
@@ -61,6 +62,7 @@ from airflow.api_fastapi.core_api.datamodels.pools import 
PoolBody
 from airflow.api_fastapi.core_api.datamodels.variables import VariableBody
 from airflow.configuration import conf
 from airflow.models import Connection, Pool, Variable
+from airflow.models.backfill import Backfill
 from airflow.models.dag import DagModel, DagRun, DagTag
 from airflow.models.dagwarning import DagWarning
 from airflow.models.log import Log
@@ -129,14 +131,21 @@ GetUserDep = Annotated[BaseUser, Depends(get_user)]
 
 
 def requires_access_dag(
-    method: ResourceMethod, access_entity: DagAccessEntity | None = None
+    method: ResourceMethod,
+    access_entity: DagAccessEntity | None = None,
+    param_dag_id: str | None = None,
 ) -> Callable[[Request, BaseUser], None]:
     def inner(
         request: Request,
         user: GetUserDep,
     ) -> None:
-        dag_id = request.path_params.get("dag_id") or 
request.query_params.get("dag_id")
-        dag_id = dag_id if dag_id != "~" else None
+        # Required for the closure to capture the dag_id but still be able to 
mutate it.
+        # Prevent from using a nonlocal statement causing test failures.
+        dag_id = param_dag_id
+        if dag_id is None:
+            dag_id = request.path_params.get("dag_id") or 
request.query_params.get("dag_id")
+            dag_id = dag_id if dag_id != "~" else None
+
         team_name = DagModel.get_team_name(dag_id) if dag_id else None
 
         _requires_access(
@@ -246,17 +255,35 @@ ReadableTagsFilterDep = Annotated[
 ]
 
 
-def requires_access_backfill(method: ResourceMethod) -> Callable[[Request, 
BaseUser], None]:
-    def inner(
+def requires_access_backfill(
+    method: ResourceMethod,
+) -> Callable[[Request, BaseUser, Session], Coroutine[Any, Any, None]]:
+    """Wrap ``requires_access_dag`` and extract the dag_id from the 
backfill_id."""
+
+    async def inner(
         request: Request,
         user: GetUserDep,
+        session: SessionDep,
     ) -> None:
-        backfill_id: NonNegativeInt | None = 
request.path_params.get("backfill_id")
-
-        _requires_access(
-            is_authorized_callback=lambda: 
get_auth_manager().is_authorized_backfill(
-                method=method, details=BackfillDetails(id=backfill_id), 
user=user
-            ),
+        dag_id = None
+
+        # Try to retrieve the dag_id from the backfill_id path param
+        backfill_id = request.path_params.get("backfill_id")
+        if backfill_id is not None and isinstance(backfill_id, int):
+            backfill = session.scalars(select(Backfill).where(Backfill.id == 
backfill_id)).one_or_none()
+            dag_id = backfill.dag_id if backfill else None
+
+        # Try to retrieve the dag_id from the request body (POST backfill)
+        if dag_id is None:
+            try:
+                dag_id = (await request.json()).get("dag_id")
+            except JSONDecodeError:
+                # Not a json body, ignore
+                pass
+
+        requires_access_dag(method, DagAccessEntity.RUN, dag_id)(
+            request,
+            user,
         )
 
     return inner
diff --git 
a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
 
b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
index de58efca7e1..75a19ea81be 100644
--- 
a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
+++ 
b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py
@@ -138,7 +138,6 @@ class TestSimpleAuthManager:
             "is_authorized_dag",
             "is_authorized_asset",
             "is_authorized_asset_alias",
-            "is_authorized_backfill",
             "is_authorized_pool",
             "is_authorized_variable",
         ],
@@ -194,7 +193,6 @@ class TestSimpleAuthManager:
             "is_authorized_connection",
             "is_authorized_asset",
             "is_authorized_asset_alias",
-            "is_authorized_backfill",
             "is_authorized_pool",
             "is_authorized_variable",
         ],
@@ -240,7 +238,6 @@ class TestSimpleAuthManager:
             "is_authorized_dag",
             "is_authorized_asset",
             "is_authorized_asset_alias",
-            "is_authorized_backfill",
             "is_authorized_pool",
         ],
     )
diff --git 
a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py 
b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
index 98217cef115..9cf86812c42 100644
--- 
a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
+++ 
b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py
@@ -25,7 +25,6 @@ from jwt import InvalidTokenError
 from airflow.api_fastapi.auth.managers.base_auth_manager import 
BaseAuthManager, T
 from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
 from airflow.api_fastapi.auth.managers.models.resource_details import (
-    BackfillDetails,
     ConnectionDetails,
     DagDetails,
     PoolDetails,
@@ -91,15 +90,6 @@ class 
EmptyAuthManager(BaseAuthManager[BaseAuthManagerUserTest]):
     ) -> bool:
         raise NotImplementedError()
 
-    def is_authorized_backfill(
-        self,
-        *,
-        method: ResourceMethod,
-        details: BackfillDetails | None = None,
-        user: BaseAuthManagerUserTest | None = None,
-    ) -> bool:
-        raise NotImplementedError()
-
     def is_authorized_asset(
         self,
         *,
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
index 6ce4a6ac85a..3b8a4fada0d 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
@@ -125,7 +125,7 @@ class TestListBackfills(TestBackfillEndpoint):
         session.add(b)
         session.commit()
 
-        with assert_queries_count(2):
+        with assert_queries_count(3):
             response = test_client.get(f"/backfills?dag_id={dag.dag_id}")
 
         assert response.status_code == 200
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py 
b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
index f86a511aed0..ccd9f53ea2f 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
@@ -27,6 +27,7 @@ from airflow.api_fastapi.auth.managers.base_auth_manager 
import COOKIE_NAME_JWT_
 from airflow.api_fastapi.auth.managers.models.resource_details import (
     ConnectionDetails,
     DagAccessEntity,
+    DagDetails,
     PoolDetails,
     VariableDetails,
 )
@@ -38,6 +39,7 @@ from airflow.api_fastapi.core_api.datamodels.variables import 
VariableBody
 from airflow.api_fastapi.core_api.security import (
     get_user,
     is_safe_url,
+    requires_access_backfill,
     requires_access_connection,
     requires_access_connection_bulk,
     requires_access_dag,
@@ -48,6 +50,7 @@ from airflow.api_fastapi.core_api.security import (
     resolve_user_from_token,
 )
 from airflow.models import Connection, Pool, Variable
+from airflow.models.dag import DagModel
 
 from tests_common.test_utils.config import conf_vars
 
@@ -147,37 +150,233 @@ class TestFastApiSecurity:
         mock_resolve_user_from_token.assert_called_once_with(expected)
 
     @pytest.mark.db_test
+    @pytest.mark.parametrize(
+        ("param_dag_id", "path_params", "query_params", "expected_dag_id"),
+        [
+            # param_dag_id takes precedence when provided
+            ("dag_id_from_param", {}, {}, "dag_id_from_param"),
+            (
+                "dag_id_from_param",
+                {"dag_id": "dag_id_from_path_params"},
+                {"dag_id": "dag_id_from_query_params"},
+                "dag_id_from_param",
+            ),
+            # path_params used when param_dag_id is None
+            (None, {"dag_id": "dag_id_from_path_params"}, {}, 
"dag_id_from_path_params"),
+            (
+                None,
+                {"dag_id": "dag_id_from_path_params"},
+                {"dag_id": "dag_id_from_query_params"},
+                "dag_id_from_path_params",
+            ),
+            # query_params used when param_dag_id and path_params have no 
dag_id
+            (None, {}, {"dag_id": "dag_id_from_query_params"}, 
"dag_id_from_query_params"),
+            # "~" is treated as None
+            (None, {"dag_id": "~"}, {}, None),
+            (None, {}, {"dag_id": "~"}, None),
+            # No source → dag_id None
+            (None, {}, {}, None),
+        ],
+    )
+    @patch.object(DagModel, "get_team_name")
     @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
-    async def test_requires_access_dag_authorized(self, mock_get_auth_manager):
+    def test_requires_access_dag_authorized(
+        self,
+        mock_get_auth_manager,
+        mock_get_team_name,
+        param_dag_id,
+        path_params,
+        query_params,
+        expected_dag_id,
+    ):
         auth_manager = Mock()
         auth_manager.is_authorized_dag.return_value = True
         mock_get_auth_manager.return_value = auth_manager
+        mock_get_team_name.return_value = "team1" if expected_dag_id else None
+
         fastapi_request = Mock()
-        fastapi_request.path_params = {}
-        fastapi_request.query_params = {}
+        fastapi_request.path_params = path_params
+        fastapi_request.query_params = query_params
+        user = Mock()
 
-        requires_access_dag("GET", DagAccessEntity.CODE)(fastapi_request, 
Mock())
+        requires_access_dag("GET", DagAccessEntity.CODE, 
param_dag_id=param_dag_id)(fastapi_request, user)
 
-        auth_manager.is_authorized_dag.assert_called_once()
+        auth_manager.is_authorized_dag.assert_called_once_with(
+            method="GET",
+            access_entity=DagAccessEntity.CODE,
+            details=DagDetails(id=expected_dag_id, 
team_name=mock_get_team_name.return_value),
+            user=user,
+        )
 
     @pytest.mark.db_test
+    @pytest.mark.parametrize(
+        ("param_dag_id", "path_params", "query_params", "expected_dag_id"),
+        [
+            ("dag_id_from_param", {}, {}, "dag_id_from_param"),
+            (None, {"dag_id": "dag_id_from_path_params"}, {}, 
"dag_id_from_path_params"),
+            (None, {}, {"dag_id": "dag_id_from_query_params"}, 
"dag_id_from_query_params"),
+            (None, {}, {}, None),
+        ],
+    )
+    @patch.object(DagModel, "get_team_name")
     @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
-    async def test_requires_access_dag_unauthorized(self, 
mock_get_auth_manager):
+    def test_requires_access_dag_unauthorized(
+        self,
+        mock_get_auth_manager,
+        mock_get_team_name,
+        param_dag_id,
+        path_params,
+        query_params,
+        expected_dag_id,
+    ):
         auth_manager = Mock()
         auth_manager.is_authorized_dag.return_value = False
         mock_get_auth_manager.return_value = auth_manager
+        mock_get_team_name.return_value = "team1" if expected_dag_id else None
+
         fastapi_request = Mock()
-        fastapi_request.path_params = {}
-        fastapi_request.query_params = {}
+        fastapi_request.path_params = path_params
+        fastapi_request.query_params = query_params
+        user = Mock()
+
+        with pytest.raises(HTTPException, match="Forbidden"):
+            requires_access_dag("GET", DagAccessEntity.CODE, 
param_dag_id=param_dag_id)(fastapi_request, user)
+
+        auth_manager.is_authorized_dag.assert_called_once_with(
+            method="GET",
+            access_entity=DagAccessEntity.CODE,
+            details=DagDetails(id=expected_dag_id, 
team_name=mock_get_team_name.return_value),
+            user=user,
+        )
 
-        mock_request = Mock()
-        mock_request.path_params.return_value = {}
-        mock_request.query_params.return_value = {}
+    @pytest.mark.db_test
+    @pytest.mark.asyncio
+    @patch.object(DagModel, "get_team_name")
+    @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+    async def test_requires_access_backfill_authorized_from_path(
+        self, mock_get_auth_manager, mock_get_team_name
+    ):
+        """When backfill_id is in path and Backfill exists, dag_id from 
backfill is used."""
+        auth_manager = Mock()
+        auth_manager.is_authorized_dag.return_value = True
+        mock_get_auth_manager.return_value = auth_manager
+        mock_get_team_name.return_value = "team1"
+
+        backfill = Mock()
+        backfill.dag_id = "backfill_dag_id"
+        session = Mock()
+        session.scalars.return_value.one_or_none.return_value = backfill
+
+        request = Mock()
+        request.path_params = {"backfill_id": 42}
+        request.json = AsyncMock(return_value={})
 
+        user = Mock()
+
+        inner = requires_access_backfill("PUT")
+        await inner(request, user, session)
+
+        auth_manager.is_authorized_dag.assert_called_once_with(
+            method="PUT",
+            access_entity=DagAccessEntity.RUN,
+            details=DagDetails(id="backfill_dag_id", team_name="team1"),
+            user=user,
+        )
+
+    @pytest.mark.db_test
+    @pytest.mark.asyncio
+    @patch.object(DagModel, "get_team_name")
+    @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+    async def test_requires_access_backfill_authorized_from_body(
+        self, mock_get_auth_manager, mock_get_team_name
+    ):
+        """When backfill_id is missing or not int, dag_id can come from 
request body (POST backfill)."""
+        auth_manager = Mock()
+        auth_manager.is_authorized_dag.return_value = True
+        mock_get_auth_manager.return_value = auth_manager
+        mock_get_team_name.return_value = "team1"
+
+        session = Mock()
+        session.scalars.return_value.one_or_none.return_value = None
+
+        request = Mock()
+        request.path_params = {}
+        request.json = AsyncMock(return_value={"dag_id": "body_dag_id"})
+
+        user = Mock()
+
+        inner = requires_access_backfill("POST")
+        await inner(request, user, session)
+
+        auth_manager.is_authorized_dag.assert_called_once_with(
+            method="POST",
+            access_entity=DagAccessEntity.RUN,
+            details=DagDetails(id="body_dag_id", team_name="team1"),
+            user=user,
+        )
+
+    @pytest.mark.db_test
+    @pytest.mark.asyncio
+    @patch.object(DagModel, "get_team_name")
+    @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+    async def test_requires_access_backfill_unauthorized(self, 
mock_get_auth_manager, mock_get_team_name):
+        """When is_authorized_dag returns False, Forbidden is raised."""
+        auth_manager = Mock()
+        auth_manager.is_authorized_dag.return_value = False
+        mock_get_auth_manager.return_value = auth_manager
+        mock_get_team_name.return_value = None
+
+        backfill = Mock()
+        backfill.dag_id = "unauthorized_dag"
+        session = Mock()
+        session.scalars.return_value.one_or_none.return_value = backfill
+
+        request = Mock()
+        request.path_params = {"backfill_id": 1}
+        user = Mock()
+
+        inner = requires_access_backfill("GET")
         with pytest.raises(HTTPException, match="Forbidden"):
-            requires_access_dag("GET", DagAccessEntity.CODE)(fastapi_request, 
Mock())
+            await inner(request, user, session)
+
+        auth_manager.is_authorized_dag.assert_called_once_with(
+            method="GET",
+            access_entity=DagAccessEntity.RUN,
+            details=DagDetails(id="unauthorized_dag", team_name=None),
+            user=user,
+        )
+
+    @pytest.mark.db_test
+    @pytest.mark.asyncio
+    @patch.object(DagModel, "get_team_name")
+    @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+    async def 
test_requires_access_backfill_backfill_not_found_falls_back_to_body(
+        self, mock_get_auth_manager, mock_get_team_name
+    ):
+        """When backfill_id is int but Backfill not found, dag_id from body is 
used."""
+        auth_manager = Mock()
+        auth_manager.is_authorized_dag.return_value = True
+        mock_get_auth_manager.return_value = auth_manager
+        mock_get_team_name.return_value = "team1"
+
+        session = Mock()
+        session.scalars.return_value.one_or_none.return_value = None
+
+        request = Mock()
+        request.path_params = {"backfill_id": 999}
+        request.json = AsyncMock(return_value={"dag_id": "fallback_dag_id"})
 
-        auth_manager.is_authorized_dag.assert_called_once()
+        user = Mock()
+
+        inner = requires_access_backfill("POST")
+        await inner(request, user, session)
+
+        auth_manager.is_authorized_dag.assert_called_once_with(
+            method="POST",
+            access_entity=DagAccessEntity.RUN,
+            details=DagDetails(id="fallback_dag_id", team_name="team1"),
+            user=user,
+        )
 
     @pytest.mark.parametrize(
         "url, expected_is_safe",

Reply via email to