This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 090607d92a Feature: Allow set Dag Run resource into Dag Level 
permission (#40703)
090607d92a is described below

commit 090607d92a7995c75b9d25f5324d11a3dae683ce
Author: Joao Amaral <[email protected]>
AuthorDate: Thu Aug 8 14:32:06 2024 -0300

    Feature: Allow set Dag Run resource into Dag Level permission (#40703)
---
 airflow/models/dag.py                              |  38 +++--
 .../providers/fab/auth_manager/fab_auth_manager.py |  41 ++++--
 .../fab/auth_manager/security_manager/override.py  | 158 +++++++++++++--------
 airflow/security/permissions.py                    |  33 ++++-
 airflow/www/views.py                               |  11 +-
 .../auth-manager/access-control.rst                |  14 +-
 newsfragments/40703.feature.rst                    |   1 +
 tests/api_connexion/endpoints/test_dag_endpoint.py |  12 +-
 .../endpoints/test_dag_run_endpoint.py             |  29 +++-
 .../endpoints/test_import_error_endpoint.py        |   8 +-
 tests/dag_processing/test_processor.py             |   2 +-
 tests/models/test_dag.py                           |  16 ++-
 tests/models/test_dagbag.py                        |  38 ++++-
 tests/providers/fab/auth_manager/test_security.py  |  45 ++++--
 tests/serialization/test_dag_serialization.py      |   9 +-
 tests/test_utils/permissions.py                    |  31 ++++
 tests/www/views/test_views_acl.py                  |   6 +-
 tests/www/views/test_views_home.py                 |  11 +-
 18 files changed, 393 insertions(+), 110 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index b5e0d585be..c9380494a0 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -455,6 +455,8 @@ class DAG(LoggingMixin):
         that it is executed when the dag succeeds.
     :param access_control: Specify optional DAG-level actions, e.g.,
         "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 
'can_delete'}}"
+        or it can specify the resource name if there is a DAGs Run resource, 
e.g.,
+        "{'role1': {'DAG Runs': {'can_create'}}, 'role2': {'DAGs': 
{'can_read', 'can_edit', 'can_delete'}}"
     :param is_paused_upon_creation: Specifies if the dag is paused when 
created for the first time.
         If the dag exists already, this flag will be ignored. If this optional 
parameter
         is not specified, the global config setting will be used.
@@ -544,7 +546,7 @@ class DAG(LoggingMixin):
         on_failure_callback: None | DagStateChangeCallback | 
list[DagStateChangeCallback] = None,
         doc_md: str | None = None,
         params: abc.MutableMapping | None = None,
-        access_control: dict | None = None,
+        access_control: dict[str, dict[str, Collection[str]]] | dict[str, 
Collection[str]] | None = None,
         is_paused_upon_creation: bool | None = None,
         jinja_environment_kwargs: dict | None = None,
         render_template_as_native_obj: bool = False,
@@ -911,21 +913,33 @@ class DAG(LoggingMixin):
         """
         if access_control is None:
             return None
-        new_perm_mapping = {
+        new_dag_perm_mapping = {
             permissions.DEPRECATED_ACTION_CAN_DAG_READ: 
permissions.ACTION_CAN_READ,
             permissions.DEPRECATED_ACTION_CAN_DAG_EDIT: 
permissions.ACTION_CAN_EDIT,
         }
+
+        def update_old_perm(permission: str):
+            new_perm = new_dag_perm_mapping.get(permission, permission)
+            if new_perm != permission:
+                warnings.warn(
+                    f"The '{permission}' permission is deprecated. Please use 
'{new_perm}'.",
+                    RemovedInAirflow3Warning,
+                    stacklevel=3,
+                )
+            return new_perm
+
         updated_access_control = {}
         for role, perms in access_control.items():
-            updated_access_control[role] = {new_perm_mapping.get(perm, perm) 
for perm in perms}
-
-        if access_control != updated_access_control:
-            warnings.warn(
-                "The 'can_dag_read' and 'can_dag_edit' permissions are 
deprecated. "
-                "Please use 'can_read' and 'can_edit', respectively.",
-                RemovedInAirflow3Warning,
-                stacklevel=3,
-            )
+            updated_access_control[role] = updated_access_control.get(role, {})
+            if isinstance(perms, (set, list)):
+                # Support for old-style access_control where only the actions 
are specified
+                updated_access_control[role][permissions.RESOURCE_DAG] = 
set(perms)
+            else:
+                updated_access_control[role] = perms
+            if permissions.RESOURCE_DAG in updated_access_control[role]:
+                updated_access_control[role][permissions.RESOURCE_DAG] = {
+                    update_old_perm(perm) for perm in 
updated_access_control[role][permissions.RESOURCE_DAG]
+                }
 
         return updated_access_control
 
@@ -4162,7 +4176,7 @@ def dag(
     on_failure_callback: None | DagStateChangeCallback | 
list[DagStateChangeCallback] = None,
     doc_md: str | None = None,
     params: abc.MutableMapping | None = None,
-    access_control: dict | None = None,
+    access_control: dict[str, dict[str, Collection[str]]] | dict[str, 
Collection[str]] | None = None,
     is_paused_upon_creation: bool | None = None,
     jinja_environment_kwargs: dict | None = None,
     render_template_as_native_obj: bool = False,
diff --git a/airflow/providers/fab/auth_manager/fab_auth_manager.py 
b/airflow/providers/fab/auth_manager/fab_auth_manager.py
index ffd5e5cab5..344df7588d 100644
--- a/airflow/providers/fab/auth_manager/fab_auth_manager.py
+++ b/airflow/providers/fab/auth_manager/fab_auth_manager.py
@@ -61,7 +61,6 @@ from airflow.security.permissions import (
     RESOURCE_DAG,
     RESOURCE_DAG_CODE,
     RESOURCE_DAG_DEPENDENCIES,
-    RESOURCE_DAG_PREFIX,
     RESOURCE_DAG_RUN,
     RESOURCE_DAG_WARNING,
     RESOURCE_DATASET,
@@ -242,6 +241,8 @@ class FabAuthManager(BaseAuthManager):
 
             return all(
                 self._is_authorized(method=method, 
resource_type=resource_type, user=user)
+                if resource_type != RESOURCE_DAG_RUN or not 
hasattr(permissions, "resource_name")
+                else self._is_authorized_dag_run(method=method, 
details=details, user=user)
                 for resource_type in resource_types
             )
 
@@ -412,7 +413,33 @@ class FabAuthManager(BaseAuthManager):
 
         if details and details.id:
             # Check whether the user has permissions to access a specific DAG
-            resource_dag_name = self._resource_name_for_dag(details.id)
+            resource_dag_name = self._resource_name(details.id, RESOURCE_DAG)
+            return self._is_authorized(method=method, 
resource_type=resource_dag_name, user=user)
+
+        return False
+
+    def _is_authorized_dag_run(
+        self,
+        method: ResourceMethod,
+        details: DagDetails | None = None,
+        user: BaseUser | None = None,
+    ) -> bool:
+        """
+        Return whether the user is authorized to perform a given action on a 
DAG Run.
+
+        :param method: the method to perform
+        :param details: optional, details about the DAG
+        :param user: optional, the user to perform the action on. If not 
provided, it uses the current user
+
+        :meta private:
+        """
+        is_global_authorized = self._is_authorized(method=method, 
resource_type=RESOURCE_DAG_RUN, user=user)
+        if is_global_authorized:
+            return True
+
+        if details and details.id:
+            # Check whether the user has permissions to access a specific DAG 
Run permission on a DAG Level
+            resource_dag_name = self._resource_name(details.id, 
RESOURCE_DAG_RUN)
             return self._is_authorized(method=method, 
resource_type=resource_dag_name, user=user)
 
         return False
@@ -444,7 +471,7 @@ class FabAuthManager(BaseAuthManager):
             raise AirflowException(f"Unknown DAG access entity: 
{dag_access_entity}")
         return _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE[dag_access_entity]
 
-    def _resource_name_for_dag(self, dag_id: str) -> str:
+    def _resource_name(self, dag_id: str, resource_type: str) -> str:
         """
         Return the FAB resource name for a DAG id.
 
@@ -453,11 +480,9 @@ class FabAuthManager(BaseAuthManager):
         :meta private:
         """
         root_dag_id = self._get_root_dag_id(dag_id)
-        if root_dag_id == RESOURCE_DAG:
-            return root_dag_id
-        if root_dag_id.startswith(RESOURCE_DAG_PREFIX):
-            return root_dag_id
-        return f"{RESOURCE_DAG_PREFIX}{root_dag_id}"
+        if hasattr(permissions, "resource_name"):
+            return getattr(permissions, "resource_name")(root_dag_id, 
resource_type)
+        return getattr(permissions, "resource_name_for_dag")(root_dag_id)
 
     @staticmethod
     def _get_user_permissions(user: BaseUser):
diff --git a/airflow/providers/fab/auth_manager/security_manager/override.py 
b/airflow/providers/fab/auth_manager/security_manager/override.py
index fd0dce0b62..e2208e5fb4 100644
--- a/airflow/providers/fab/auth_manager/security_manager/override.py
+++ b/airflow/providers/fab/auth_manager/security_manager/override.py
@@ -344,7 +344,16 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
     ]
 
     # global resource for dag-level access
-    DAG_ACTIONS = permissions.DAG_ACTIONS
+    RESOURCE_DETAILS_MAP = getattr(
+        permissions,
+        "RESOURCE_DETAILS_MAP",
+        {
+            permissions.RESOURCE_DAG: {
+                "actions": permissions.DAG_ACTIONS,
+            }
+        },
+    )
+    DAG_ACTIONS = RESOURCE_DETAILS_MAP[permissions.RESOURCE_DAG]["actions"]
 
     def __init__(self, appbuilder):
         # done in super, but we need it before we can call super.
@@ -1031,7 +1040,7 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         """Check if user has read or write access to some dags."""
         if dag_id and dag_id != "~":
             root_dag_id = self._get_root_dag_id(dag_id)
-            return self.has_access(action, 
permissions.resource_name_for_dag(root_dag_id))
+            return self.has_access(action, self._resource_name(root_dag_id, 
permissions.RESOURCE_DAG))
 
         user = g.user
         if action == permissions.ACTION_CAN_READ:
@@ -1065,24 +1074,25 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
 
         for dag in dags:
             root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else 
dag.dag_id
-            dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-            for action_name in self.DAG_ACTIONS:
-                if (action_name, dag_resource_name) not in perms:
-                    self._merge_perm(action_name, dag_resource_name)
+            for resource_name, resource_values in 
self.RESOURCE_DETAILS_MAP.items():
+                dag_resource_name = self._resource_name(root_dag_id, 
resource_name)
+                for action_name in resource_values["actions"]:
+                    if (action_name, dag_resource_name) not in perms:
+                        self._merge_perm(action_name, dag_resource_name)
 
             if dag.access_control is not None:
-                self.sync_perm_for_dag(dag_resource_name, dag.access_control)
+                self.sync_perm_for_dag(root_dag_id, dag.access_control)
 
     def prefixed_dag_id(self, dag_id: str) -> str:
         """Return the permission name for a DAG id."""
         warnings.warn(
             "`prefixed_dag_id` has been deprecated. "
-            "Please use `airflow.security.permissions.resource_name_for_dag` 
instead.",
+            "Please use `airflow.security.permissions.resource_name` instead.",
             RemovedInAirflow3Warning,
             stacklevel=2,
         )
         root_dag_id = self._get_root_dag_id(dag_id)
-        return permissions.resource_name_for_dag(root_dag_id)
+        return self._resource_name(root_dag_id, permissions.RESOURCE_DAG)
 
     def is_dag_resource(self, resource_name: str) -> bool:
         """Determine if a resource belongs to a DAG or all DAGs."""
@@ -1093,7 +1103,7 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
     def sync_perm_for_dag(
         self,
         dag_id: str,
-        access_control: dict[str, Collection[str]] | None = None,
+        access_control: dict[str, dict[str, Collection[str]]] | None = None,
     ) -> None:
         """
         Sync permissions for given dag id.
@@ -1101,62 +1111,78 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         The dag id surely exists in our dag bag as only / refresh button or 
DagBag will call this function.
 
         :param dag_id: the ID of the DAG whose permissions should be updated
-        :param access_control: a dict where each key is a role name and
-            each value is a set() of action names (e.g.,
-            {'can_read'}
+        :param access_control: a dict where each key is a role name and each 
value can be:
+             - a set() of DAGs resource action names (e.g. `{'can_read'}`)
+             - or a dict where each key is a resource name ('DAGs' or 'DAG 
Runs') and each value
+             is a set() of action names (e.g., `{'DAG Runs': {'can_create'}, 
'DAGs': {'can_read'}}`)
         :return:
         """
-        dag_resource_name = permissions.resource_name_for_dag(dag_id)
-        for dag_action_name in self.DAG_ACTIONS:
-            self.create_permission(dag_action_name, dag_resource_name)
+        for resource_name, resource_values in 
self.RESOURCE_DETAILS_MAP.items():
+            dag_resource_name = self._resource_name(dag_id, resource_name)
+            for dag_action_name in resource_values["actions"]:
+                self.create_permission(dag_action_name, dag_resource_name)
 
         if access_control is not None:
-            self.log.debug("Syncing DAG-level permissions for DAG '%s'", 
dag_resource_name)
-            self._sync_dag_view_permissions(dag_resource_name, access_control)
+            self.log.debug("Syncing DAG-level permissions for DAG '%s'", 
dag_id)
+            self._sync_dag_view_permissions(dag_id, access_control.copy())
         else:
             self.log.debug(
                 "Not syncing DAG-level permissions for DAG '%s' as access 
control is unset.",
-                dag_resource_name,
+                dag_id,
             )
 
-    def _sync_dag_view_permissions(self, dag_id: str, access_control: 
dict[str, Collection[str]]) -> None:
+    def _resource_name(self, dag_id: str, resource_name: str) -> str:
+        """
+        Get the resource name from permissions.
+
+        This method is to keep compatibility with new FAB versions
+        running with old airflow versions.
+        """
+        if hasattr(permissions, "resource_name"):
+            return getattr(permissions, "resource_name")(dag_id, resource_name)
+        return getattr(permissions, "resource_name_for_dag")(dag_id)
+
+    def _sync_dag_view_permissions(
+        self,
+        dag_id: str,
+        access_control: dict[str, dict[str, Collection[str]]],
+    ) -> None:
         """
         Set the access policy on the given DAG's ViewModel.
 
         :param dag_id: the ID of the DAG whose permissions should be updated
-        :param access_control: a dict where each key is a role name and
-            each value is a set() of action names (e.g. {'can_read'})
+        :param access_control: a dict where each key is a role name and each 
value is:
+            - a dict where each key is a resource name ('DAGs' or 'DAG Runs') 
and each value
+            is a set() of action names (e.g., `{'DAG Runs': {'can_create'}, 
'DAGs': {'can_read'}}`)
         """
-        dag_resource_name = permissions.resource_name_for_dag(dag_id)
 
-        def _get_or_create_dag_permission(action_name: str) -> Permission | 
None:
+        def _get_or_create_dag_permission(action_name: str, dag_resource_name: 
str) -> Permission | None:
             perm = self.get_permission(action_name, dag_resource_name)
             if not perm:
                 self.log.info("Creating new action '%s' on resource '%s'", 
action_name, dag_resource_name)
                 perm = self.create_permission(action_name, dag_resource_name)
-
             return perm
 
-        def _revoke_stale_permissions(resource: Resource):
-            existing_dag_perms = self.get_resource_permissions(resource)
-            for perm in existing_dag_perms:
-                non_admin_roles = [role for role in perm.role if role.name != 
"Admin"]
-                for role in non_admin_roles:
-                    target_perms_for_role = access_control.get(role.name, ())
-                    if perm.action.name not in target_perms_for_role:
-                        self.log.info(
-                            "Revoking '%s' on DAG '%s' for role '%s'",
-                            perm.action,
-                            dag_resource_name,
-                            role.name,
-                        )
-                        self.remove_permission_from_role(role, perm)
-
-        resource = self.get_resource(dag_resource_name)
-        if resource:
-            _revoke_stale_permissions(resource)
-
-        for rolename, action_names in access_control.items():
+        # Revoking stale permissions for all possible DAG level resources
+        for resource_name in self.RESOURCE_DETAILS_MAP.keys():
+            dag_resource_name = self._resource_name(dag_id, resource_name)
+            if resource := self.get_resource(dag_resource_name):
+                existing_dag_perms = self.get_resource_permissions(resource)
+                for perm in existing_dag_perms:
+                    non_admin_roles = [role for role in perm.role if role.name 
!= "Admin"]
+                    for role in non_admin_roles:
+                        target_perms_for_role = access_control.get(role.name, 
{}).get(resource_name, set())
+                        if perm.action.name not in target_perms_for_role:
+                            self.log.info(
+                                "Revoking '%s' on DAG '%s' for role '%s'",
+                                perm.action,
+                                dag_resource_name,
+                                role.name,
+                            )
+                            self.remove_permission_from_role(role, perm)
+
+        # Adding the access control permissions
+        for rolename, resource_actions in access_control.items():
             role = self.find_role(rolename)
             if not role:
                 raise AirflowException(
@@ -1164,19 +1190,34 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
                     f"'{rolename}', but that role does not exist"
                 )
 
-            action_names = set(action_names)
-            invalid_action_names = action_names - self.DAG_ACTIONS
-            if invalid_action_names:
-                raise AirflowException(
-                    f"The access_control map for DAG '{dag_resource_name}' 
includes "
-                    f"the following invalid permissions: 
{invalid_action_names}; "
-                    f"The set of valid permissions is: {self.DAG_ACTIONS}"
-                )
+            if isinstance(resource_actions, (set, list)):
+                # Support for old-style access_control where only the actions 
are specified
+                resource_actions = {permissions.RESOURCE_DAG: 
set(resource_actions)}
+
+            for resource_name, actions in resource_actions.items():
+                if resource_name not in self.RESOURCE_DETAILS_MAP:
+                    raise AirflowException(
+                        f"The access_control map for DAG '{dag_id}' includes 
the following invalid "
+                        f"resource name: '{resource_name}'; "
+                        f"The set of valid resource names is: 
{self.RESOURCE_DETAILS_MAP.keys()}"
+                    )
+
+                dag_resource_name = self._resource_name(dag_id, resource_name)
+                self.log.debug("Syncing DAG-level permissions for DAG '%s'", 
dag_resource_name)
+
+                invalid_actions = set(actions) - 
self.RESOURCE_DETAILS_MAP[resource_name]["actions"]
+
+                if invalid_actions:
+                    raise AirflowException(
+                        f"The access_control map for DAG '{dag_resource_name}' 
includes "
+                        f"the following invalid permissions: 
{invalid_actions}; "
+                        f"The set of valid permissions is: 
{self.RESOURCE_DETAILS_MAP[resource_name]['actions']}"
+                    )
 
-            for action_name in action_names:
-                dag_perm = _get_or_create_dag_permission(action_name)
-                if dag_perm:
-                    self.add_permission_to_role(role, dag_perm)
+                for action_name in actions:
+                    dag_perm = _get_or_create_dag_permission(action_name, 
dag_resource_name)
+                    if dag_perm:
+                        self.add_permission_to_role(role, dag_perm)
 
     def add_permissions_view(self, base_action_names, resource_name):  # Keep 
name for compatibility with FAB.
         """
@@ -1307,8 +1348,9 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         Add the missing ones to the table for admin.
         """
         session = self.appbuilder.get_session
+        prefixes = getattr(permissions, "PREFIX_LIST", 
[permissions.RESOURCE_DAG_PREFIX])
         dag_resources = session.scalars(
-            
select(Resource).where(Resource.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+            select(Resource).where(or_(*[Resource.name.like(f"{prefix}%") for 
prefix in prefixes]))
         )
         resource_ids = [resource.id for resource in dag_resources]
 
diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py
index e333876184..058cde2927 100644
--- a/airflow/security/permissions.py
+++ b/airflow/security/permissions.py
@@ -16,6 +16,8 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TypedDict
+
 # Resource Constants
 RESOURCE_ACTION = "Permissions"
 RESOURCE_ADMIN_MENU = "Admin"
@@ -28,6 +30,7 @@ RESOURCE_DAG_CODE = "DAG Code"
 RESOURCE_DAG_DEPENDENCIES = "DAG Dependencies"
 RESOURCE_DAG_PREFIX = "DAG:"
 RESOURCE_DAG_RUN = "DAG Runs"
+RESOURCE_DAG_RUN_PREFIX = "DAG Run:"
 RESOURCE_DAG_WARNING = "DAG Warnings"
 RESOURCE_CLUSTER_ACTIVITY = "Cluster Activity"
 RESOURCE_DATASET = "Datasets"
@@ -64,10 +67,32 @@ ACTION_CAN_ACCESS_MENU = "menu_access"
 DEPRECATED_ACTION_CAN_DAG_READ = "can_dag_read"
 DEPRECATED_ACTION_CAN_DAG_EDIT = "can_dag_edit"
 
+
+class ResourceDetails(TypedDict):
+    """Details of a resource (actions and prefix)."""
+
+    actions: set[str]
+    prefix: str
+
+
+# Keeping DAG_ACTIONS to keep the compatibility with outdated versions of FAB 
provider
 DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE}
 
 
-def resource_name_for_dag(root_dag_id: str) -> str:
+RESOURCE_DETAILS_MAP = {
+    RESOURCE_DAG: ResourceDetails(
+        actions={ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE}, 
prefix=RESOURCE_DAG_PREFIX
+    ),
+    RESOURCE_DAG_RUN: ResourceDetails(
+        actions={ACTION_CAN_READ, ACTION_CAN_CREATE, ACTION_CAN_DELETE, 
ACTION_CAN_ACCESS_MENU},
+        prefix=RESOURCE_DAG_RUN_PREFIX,
+    ),
+}
+PREFIX_LIST = [details["prefix"] for details in RESOURCE_DETAILS_MAP.values()]
+PREFIX_RESOURCES_MAP = {details["prefix"]: resource for resource, details in 
RESOURCE_DETAILS_MAP.items()}
+
+
+def resource_name(root_dag_id: str, resource: str) -> str:
     """
     Return the resource name for a DAG id.
 
@@ -75,8 +100,8 @@ def resource_name_for_dag(root_dag_id: str) -> str:
     parent DAG, you should pass ``DagModel.root_dag_id`` to this function,
     for a subdag. A normal dag should pass the ``DagModel.dag_id``.
     """
-    if root_dag_id == RESOURCE_DAG:
+    if root_dag_id in RESOURCE_DETAILS_MAP.keys():
         return root_dag_id
-    if root_dag_id.startswith(RESOURCE_DAG_PREFIX):
+    if root_dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
         return root_dag_id
-    return f"{RESOURCE_DAG_PREFIX}{root_dag_id}"
+    return f"{RESOURCE_DETAILS_MAP[resource]['prefix']}{root_dag_id}"
diff --git a/airflow/www/views.py b/airflow/www/views.py
index bfa7b3d177..236beed451 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -996,9 +996,6 @@ class Airflow(AirflowBaseView):
                 .unique()
                 .all()
             )
-            can_create_dag_run = get_auth_manager().is_authorized_dag(
-                method="POST", access_entity=DagAccessEntity.RUN, user=g.user
-            )
 
             dataset_triggered_dag_ids = {dag.dag_id for dag in dags if 
dag.schedule_interval == "Dataset"}
             if dataset_triggered_dag_ids:
@@ -1013,6 +1010,12 @@ class Airflow(AirflowBaseView):
                 dag.can_edit = get_auth_manager().is_authorized_dag(
                     method="PUT", details=DagDetails(id=dag.dag_id), 
user=g.user
                 )
+                can_create_dag_run = get_auth_manager().is_authorized_dag(
+                    method="POST",
+                    access_entity=DagAccessEntity.RUN,
+                    details=DagDetails(id=dag.dag_id),
+                    user=g.user,
+                )
                 dag.can_trigger = dag.can_edit and can_create_dag_run
                 dag.can_delete = get_auth_manager().is_authorized_dag(
                     method="DELETE", details=DagDetails(id=dag.dag_id), 
user=g.user
@@ -5742,7 +5745,7 @@ def add_user_permissions_to_dag(sender, template, 
context, **extra):
         return
     dag = context["dag"]
     can_create_dag_run = get_auth_manager().is_authorized_dag(
-        method="POST", access_entity=DagAccessEntity.RUN
+        method="POST", access_entity=DagAccessEntity.RUN, 
details=DagDetails(id=dag.dag_id)
     )
 
     dag.can_edit = get_auth_manager().is_authorized_dag(method="PUT", 
details=DagDetails(id=dag.dag_id))
diff --git a/docs/apache-airflow-providers-fab/auth-manager/access-control.rst 
b/docs/apache-airflow-providers-fab/auth-manager/access-control.rst
index 4d5637d40d..b71f08f557 100644
--- a/docs/apache-airflow-providers-fab/auth-manager/access-control.rst
+++ b/docs/apache-airflow-providers-fab/auth-manager/access-control.rst
@@ -138,7 +138,7 @@ There are five default roles: Public, Viewer, User, Op, and 
Admin. Each one has
 DAG-level permissions
 ^^^^^^^^^^^^^^^^^^^^^
 
-For DAG-level permissions exclusively, access can be controlled at the level 
of all DAGs or individual DAG objects. This includes ``DAGs.can_read``, 
``DAGs.can_edit``, and ``DAGs.can_delete``. When these permissions are listed, 
access is granted to users who either have the listed permission or the same 
permission for the specific DAG being acted upon. For individual DAGs, the 
resource name is ``DAG:`` + the DAG ID.
+For DAG-level permissions exclusively, access can be controlled at the level 
of all DAGs or individual DAG objects. This includes ``DAGs.can_read``, 
``DAGs.can_edit``, ``DAGs.can_delete``, ``DAG Runs.can_read``, ``DAG 
Runs.can_create``, ``DAG Runs.can_delete``, and ``DAG Runs.menu_access``. When 
these permissions are listed, access is granted to users who either have the 
listed permission or the same permission for the specific DAG being acted upon. 
For individual DAGs, the resource name [...]
 
 For example, if a user is trying to view DAG information for the 
``example_dag_id``, and the endpoint requires ``DAGs.can_read`` access, access 
will be granted if the user has either ``DAGs.can_read`` or 
``DAG:example_dag_id.can_read`` access.
 
@@ -322,6 +322,18 @@ Setting ``access_control`` on a DAG will overwrite any 
previously existing DAG-l
         },
     )
 
+It's also possible to add DAG Runs resource permissions in a similar way, but 
explicit adding the resource name to identify which resource the permissions 
are for:
+
+.. code-block:: python
+
+    DAG(
+        dag_id="example_fine_grained_access",
+        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+        access_control={
+            "Viewer": {"DAGs": {"can_edit", "can_read", "can_delete"}, "DAG 
Runs": {"can_create"}},
+        },
+    )
+
 This also means that setting ``access_control={}`` will wipe any existing 
DAG-level permissions for a given DAG from the DB:
 
 .. code-block:: python
diff --git a/newsfragments/40703.feature.rst b/newsfragments/40703.feature.rst
new file mode 100644
index 0000000000..4fd2fddf7e
--- /dev/null
+++ b/newsfragments/40703.feature.rst
@@ -0,0 +1 @@
+Allow set Dag Run resource into Dag Level permission: extends Dag's 
access_control feature to allow Dag Run resource permissions.
diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_endpoint.py
index 283bc3457c..a546e6bec4 100644
--- a/tests/api_connexion/endpoints/test_dag_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_endpoint.py
@@ -69,11 +69,19 @@ def configured_app(minimal_app_for_api):
     create_user(app, username="test_granular_permissions", 
role_name="TestGranularDag")  # type: ignore
     app.appbuilder.sm.sync_perm_for_dag(  # type: ignore
         "TEST_DAG_1",
-        access_control={"TestGranularDag": [permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ]},
+        access_control={
+            "TestGranularDag": {
+                permissions.RESOURCE_DAG: {permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ}
+            },
+        },
     )
     app.appbuilder.sm.sync_perm_for_dag(  # type: ignore
         "TEST_DAG_1",
-        access_control={"TestGranularDag": [permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ]},
+        access_control={
+            "TestGranularDag": {
+                permissions.RESOURCE_DAG: {permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ}
+            },
+        },
     )
 
     with DAG(
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 1e17548b09..dc77648784 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -62,6 +62,20 @@ def configured_app(minimal_app_for_api):
             (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
         ],
     )
+    create_user(
+        app,  # type: ignore
+        username="test_no_dag_run_create_permission",
+        role_name="TestNoDagRunCreatePermission",
+        permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DATASET),
+            (permissions.ACTION_CAN_READ, 
permissions.RESOURCE_CLUSTER_ACTIVITY),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
+        ],
+    )
     create_user(
         app,  # type: ignore
         username="test_dag_view_only",
@@ -91,7 +105,10 @@ def configured_app(minimal_app_for_api):
     )
     app.appbuilder.sm.sync_perm_for_dag(  # type: ignore
         "TEST_DAG_ID",
-        access_control={"TestGranularDag": [permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ]},
+        access_control={
+            "TestGranularDag": {permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ},
+            "TestNoDagRunCreatePermission": {permissions.RESOURCE_DAG_RUN: 
{permissions.ACTION_CAN_CREATE}},
+        },
     )
     create_user(app, username="test_no_permissions", 
role_name="TestNoPermissions")  # type: ignore
 
@@ -102,6 +119,7 @@ def configured_app(minimal_app_for_api):
     delete_user(app, username="test_view_dags")  # type: ignore
     delete_user(app, username="test_granular_permissions")  # type: ignore
     delete_user(app, username="test_no_permissions")  # type: ignore
+    delete_user(app, username="test_no_dag_run_create_permission")  # type: 
ignore
     delete_roles(app)
 
 
@@ -1331,6 +1349,15 @@ class TestPostDagRun(TestDagRunEndpoint):
         assert response.status_code == 400
         assert "Invalid input for param" in response.json["detail"]
 
+    def test_dagrun_trigger_with_dag_level_permissions(self):
+        self._create_dag("TEST_DAG_ID")
+        response = self.client.post(
+            "api/v1/dags/TEST_DAG_ID/dagRuns",
+            json={"conf": {"validated_number": 1}},
+            environ_overrides={"REMOTE_USER": 
"test_no_dag_run_create_permission"},
+        )
+        assert response.status_code == 200
+
     
@mock.patch("airflow.api_connexion.endpoints.dag_run_endpoint.get_airflow_app")
     def test_dagrun_creation_exception_is_handled(self, mock_get_app, session):
         self._create_dag("TEST_DAG_ID")
diff --git a/tests/api_connexion/endpoints/test_import_error_endpoint.py 
b/tests/api_connexion/endpoints/test_import_error_endpoint.py
index 4483646ce5..635e159bb2 100644
--- a/tests/api_connexion/endpoints/test_import_error_endpoint.py
+++ b/tests/api_connexion/endpoints/test_import_error_endpoint.py
@@ -29,6 +29,7 @@ from tests.test_utils.api_connexion_utils import assert_401, 
create_user, delete
 from tests.test_utils.compat import ParseImportError
 from tests.test_utils.config import conf_vars
 from tests.test_utils.db import clear_db_dags, clear_db_import_errors
+from tests.test_utils.permissions import _resource_name
 
 pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
 
@@ -60,7 +61,12 @@ def configured_app(minimal_app_for_api):
         [
             {
                 "role": "TestSingleDAG",
-                "perms": [(permissions.ACTION_CAN_READ, 
permissions.resource_name_for_dag(TEST_DAG_IDS[0]))],
+                "perms": [
+                    (
+                        permissions.ACTION_CAN_READ,
+                        _resource_name(TEST_DAG_IDS[0], 
permissions.RESOURCE_DAG),
+                    )
+                ],
             }
         ]
     )
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index 0893072d0b..c2ff159ea4 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -1035,7 +1035,7 @@ class TestDagFileProcessor:
 
         with create_session() as session:
             with assert_queries_count(
-                expected_count=94,
+                expected_count=154,
                 margin=10,
                 session=session,
             ):
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index cc628e6b8a..376d5c5beb 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -2788,19 +2788,27 @@ my_postgres_conn:
         outdated_permissions = {
             "role1": {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT},
             "role2": {permissions.DEPRECATED_ACTION_CAN_DAG_READ, 
permissions.DEPRECATED_ACTION_CAN_DAG_EDIT},
+            "role3": {permissions.RESOURCE_DAG_RUN: 
{permissions.ACTION_CAN_CREATE}},
         }
         updated_permissions = {
-            "role1": {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT},
-            "role2": {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT},
+            "role1": {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT}},
+            "role2": {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT}},
+            "role3": {permissions.RESOURCE_DAG_RUN: 
{permissions.ACTION_CAN_CREATE}},
         }
 
-        with pytest.warns(DeprecationWarning):
+        with pytest.warns(DeprecationWarning) as deprecation_warnings:
             dag = DAG(dag_id="dag_with_outdated_perms", 
access_control=outdated_permissions)
         assert dag.access_control == updated_permissions
+        assert len(deprecation_warnings) == 2
+        assert "permission is deprecated" in 
str(deprecation_warnings[0].message)
+        assert "permission is deprecated" in 
str(deprecation_warnings[1].message)
 
-        with pytest.warns(DeprecationWarning):
+        with pytest.warns(DeprecationWarning) as deprecation_warnings:
             dag.access_control = outdated_permissions
         assert dag.access_control == updated_permissions
+        assert len(deprecation_warnings) == 2
+        assert "permission is deprecated" in 
str(deprecation_warnings[0].message)
+        assert "permission is deprecated" in 
str(deprecation_warnings[1].message)
 
     def test_validate_executor_field_executor_not_configured(self):
         dag = DAG(
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 5101dde5fc..836ede04df 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -965,7 +965,43 @@ class TestDagBag:
             dag.access_control = {"Public": {"can_read"}}
             _sync_perms()
             mock_sync_perm_for_dag.assert_called_once_with(
-                "test_example_bash_operator", {"Public": {"can_read"}}
+                "test_example_bash_operator", {"Public": {"DAGs": 
{"can_read"}}}
+            )
+
+    @patch("airflow.www.security_appless.ApplessAirflowSecurityManager")
+    def test_sync_perm_for_dag_with_dict_access_control(self, 
mock_security_manager):
+        """
+        Test that dagbag._sync_perm_for_dag will call 
ApplessAirflowSecurityManager.sync_perm_for_dag
+        """
+        db_clean_up()
+        with create_session() as session:
+            security_manager = ApplessAirflowSecurityManager(session)
+            mock_sync_perm_for_dag = 
mock_security_manager.return_value.sync_perm_for_dag
+            mock_sync_perm_for_dag.side_effect = 
security_manager.sync_perm_for_dag
+
+            dagbag = DagBag(
+                dag_folder=os.path.join(TEST_DAGS_FOLDER, 
"test_example_bash_operator.py"),
+                include_examples=False,
+            )
+            dag = dagbag.dags["test_example_bash_operator"]
+
+            def _sync_perms():
+                mock_sync_perm_for_dag.reset_mock()
+                DagBag._sync_perm_for_dag(dag, session=session)
+
+            # perms dont exist
+            _sync_perms()
+            
mock_sync_perm_for_dag.assert_called_once_with("test_example_bash_operator", 
None)
+
+            # perms now exist
+            _sync_perms()
+            
mock_sync_perm_for_dag.assert_called_once_with("test_example_bash_operator", 
None)
+
+            # Always sync if we have access_control
+            dag.access_control = {"Public": {"DAGs": {"can_read"}, "DAG Runs": 
{"can_create"}}}
+            _sync_perms()
+            mock_sync_perm_for_dag.assert_called_once_with(
+                "test_example_bash_operator", {"Public": {"DAGs": 
{"can_read"}, "DAG Runs": {"can_create"}}}
             )
 
     
@patch("airflow.models.dagbag.settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL", 5)
diff --git a/tests/providers/fab/auth_manager/test_security.py 
b/tests/providers/fab/auth_manager/test_security.py
index d3238ba0f1..5ff7f34d01 100644
--- a/tests/providers/fab/auth_manager/test_security.py
+++ b/tests/providers/fab/auth_manager/test_security.py
@@ -59,11 +59,12 @@ from tests.test_utils.api_connexion_utils import (
 from tests.test_utils.asserts import assert_queries_count
 from tests.test_utils.db import clear_db_dags, clear_db_runs
 from tests.test_utils.mock_security_manager import MockSecurityManager
+from tests.test_utils.permissions import _resource_name
 
 pytestmark = pytest.mark.db_test
 
-READ_WRITE = {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}
-READ_ONLY = {permissions.ACTION_CAN_READ}
+READ_WRITE = {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT}}
+READ_ONLY = {permissions.RESOURCE_DAG: {permissions.ACTION_CAN_READ}}
 
 logging.basicConfig(format="%(asctime)s:%(levelname)s:%(name)s:%(message)s")
 logging.getLogger().setLevel(logging.DEBUG)
@@ -108,7 +109,7 @@ def _clear_db_dag_and_runs():
 
 
 def _delete_dag_permissions(dag_id, security_manager):
-    dag_resource_name = permissions.resource_name_for_dag(dag_id)
+    dag_resource_name = _resource_name(dag_id, permissions.RESOURCE_DAG)
     for dag_action_name in security_manager.DAG_ACTIONS:
         security_manager.delete_permission(dag_action_name, dag_resource_name)
 
@@ -751,6 +752,19 @@ def 
test_access_control_with_non_existent_role(security_manager):
     assert "role does not exist" in str(ctx.value)
 
 
+def test_access_control_with_non_allowed_resource(security_manager):
+    with pytest.raises(AirflowException) as ctx:
+        security_manager._sync_dag_view_permissions(
+            dag_id="access-control-test",
+            access_control={
+                "Public": {
+                    permissions.RESOURCE_POOL: {permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ}
+                }
+            },
+        )
+    assert "invalid resource name" in str(ctx.value)
+
+
 def test_all_dag_access_doesnt_give_non_dag_access(app, security_manager):
     username = "dag_access_user"
     role_name = "dag_access_role"
@@ -790,6 +804,17 @@ def test_access_control_with_invalid_permission(app, 
security_manager):
                 )
             assert "invalid permissions" in str(ctx.value)
 
+            with pytest.raises(AirflowException) as ctx:
+                security_manager._sync_dag_view_permissions(
+                    "access_control_test",
+                    access_control={rolename: {permissions.RESOURCE_DAG_RUN: 
{action}}},
+                )
+            if hasattr(permissions, "resource_name"):
+                assert "invalid permission" in str(ctx.value)
+            else:
+                # Test with old airflow running new FAB
+                assert "invalid resource name" in str(ctx.value)
+
 
 def test_access_control_is_set_on_init(
     app,
@@ -890,7 +915,11 @@ def 
test_correct_roles_have_perms_to_read_config(security_manager):
 
 
 def test_create_dag_specific_permissions(session, security_manager, 
monkeypatch, sample_dags):
-    access_control = {"Public": {permissions.ACTION_CAN_READ}}
+    access_control = (
+        {"Public": {"DAGs": {permissions.ACTION_CAN_READ}}}
+        if hasattr(permissions, "resource_name")
+        else {"Public": {permissions.ACTION_CAN_READ}}
+    )
 
     collect_dags_from_db_mock = mock.Mock()
     dagbag_mock = mock.Mock()
@@ -906,7 +935,7 @@ def test_create_dag_specific_permissions(session, 
security_manager, monkeypatch,
     security_manager._sync_dag_view_permissions = mock.Mock()
 
     for dag in sample_dags:
-        dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
+        dag_resource_name = _resource_name(dag.dag_id, 
permissions.RESOURCE_DAG)
         all_perms = security_manager.get_all_permissions()
         assert ("can_read", dag_resource_name) not in all_perms
         assert ("can_edit", dag_resource_name) not in all_perms
@@ -917,13 +946,13 @@ def test_create_dag_specific_permissions(session, 
security_manager, monkeypatch,
     collect_dags_from_db_mock.assert_called_once_with()
 
     for dag in sample_dags:
-        dag_resource_name = permissions.resource_name_for_dag(dag.dag_id)
+        dag_resource_name = _resource_name(dag.dag_id, 
permissions.RESOURCE_DAG)
         all_perms = security_manager.get_all_permissions()
         assert ("can_read", dag_resource_name) in all_perms
         assert ("can_edit", dag_resource_name) in all_perms
 
     security_manager._sync_dag_view_permissions.assert_called_once_with(
-        permissions.resource_name_for_dag("has_access_control"),
+        "has_access_control",
         access_control,
     )
 
@@ -973,7 +1002,7 @@ def test_prefixed_dag_id_is_deprecated(security_manager):
         DeprecationWarning,
         match=(
             "`prefixed_dag_id` has been deprecated. "
-            "Please use `airflow.security.permissions.resource_name_for_dag` 
instead."
+            "Please use `airflow.security.permissions.resource_name` instead."
         ),
     ):
         security_manager.prefixed_dag_id("hello")
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index cefc5b7a2d..e9c8ceaf03 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -238,8 +238,13 @@ serialized_simple_dag_ground_truth = {
             "__type": "dict",
             "__var": {
                 "test_role": {
-                    "__type": "set",
-                    "__var": [permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT],
+                    "__type": "dict",
+                    "__var": {
+                        "DAGs": {
+                            "__type": "set",
+                            "__var": [permissions.ACTION_CAN_READ, 
permissions.ACTION_CAN_EDIT],
+                        }
+                    },
                 }
             },
         },
diff --git a/tests/test_utils/permissions.py b/tests/test_utils/permissions.py
new file mode 100644
index 0000000000..46e964c1f6
--- /dev/null
+++ b/tests/test_utils/permissions.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from airflow.security import permissions
+
+
+def _resource_name(dag_id: str, resource_name: str) -> str:
+    """
+    This method is to keep compatibility with new FAB versions
+    running with old airflow versions.
+    """
+    if hasattr(permissions, "resource_name"):
+        return getattr(permissions, "resource_name")(dag_id, resource_name)
+    if resource_name == permissions.RESOURCE_DAG:
+        return getattr(permissions, "resource_name_for_dag")(dag_id)
+    raise Exception("Only DAG resource is supported in this Airflow version.")
diff --git a/tests/www/views/test_views_acl.py 
b/tests/www/views/test_views_acl.py
index 1f0dd0d072..17700749f6 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -32,6 +32,7 @@ from airflow.utils.types import DagRunType
 from airflow.www.views import FILTER_STATUS_COOKIE
 from tests.test_utils.api_connexion_utils import create_user_scope
 from tests.test_utils.db import clear_db_runs
+from tests.test_utils.permissions import _resource_name
 from tests.test_utils.www import check_content_in_response, 
check_content_not_in_response, client_with_login
 
 pytestmark = pytest.mark.db_test
@@ -883,7 +884,10 @@ def user_dag_level_access_with_ti_edit(acl_app):
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
-            (permissions.ACTION_CAN_EDIT, 
permissions.resource_name_for_dag("example_bash_operator")),
+            (
+                permissions.ACTION_CAN_EDIT,
+                _resource_name("example_bash_operator", 
permissions.RESOURCE_DAG),
+            ),
         ],
     ) as user:
         yield user
diff --git a/tests/www/views/test_views_home.py 
b/tests/www/views/test_views_home.py
index 275c585170..f769311287 100644
--- a/tests/www/views/test_views_home.py
+++ b/tests/www/views/test_views_home.py
@@ -29,6 +29,7 @@ from airflow.www.utils import UIAlert
 from airflow.www.views import FILTER_LASTRUN_COOKIE, FILTER_STATUS_COOKIE, 
FILTER_TAGS_COOKIE
 from tests.test_utils.api_connexion_utils import create_user
 from tests.test_utils.db import clear_db_dags, clear_db_import_errors, 
clear_db_serialized_dags
+from tests.test_utils.permissions import _resource_name
 from tests.test_utils.www import check_content_in_response, 
check_content_not_in_response, client_with_login
 
 pytestmark = pytest.mark.db_test
@@ -147,7 +148,10 @@ def user_single_dag(app):
         permissions=[
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR),
-            (permissions.ACTION_CAN_READ, 
permissions.resource_name_for_dag(TEST_FILTER_DAG_IDS[0])),
+            (
+                permissions.ACTION_CAN_READ,
+                _resource_name(TEST_FILTER_DAG_IDS[0], 
permissions.RESOURCE_DAG),
+            ),
         ],
     )
 
@@ -172,7 +176,10 @@ def user_single_dag_edit(app):
         permissions=[
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
-            (permissions.ACTION_CAN_EDIT, 
permissions.resource_name_for_dag("filter_test_1")),
+            (
+                permissions.ACTION_CAN_EDIT,
+                _resource_name("filter_test_1", permissions.RESOURCE_DAG),
+            ),
         ],
     )
 

Reply via email to