uranusjr commented on code in PR #33213:
URL: https://github.com/apache/airflow/pull/33213#discussion_r1318200546


##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -87,35 +131,231 @@ def is_logged_in(self) -> bool:
         """Return whether the user is logged in."""
         return not self.get_user().is_anonymous
 
+    def is_authorized_configuration(self, *, method: ResourceMethod, user: 
BaseUser | None = None) -> bool:
+        return self._is_authorized(method=method, 
resource_type=RESOURCE_CONFIG, user=user)
+
+    def is_authorized_cluster_activity(self, *, method: ResourceMethod, user: 
BaseUser | None = None) -> bool:
+        return self._is_authorized(method=method, 
resource_type=RESOURCE_CLUSTER_ACTIVITY, user=user)
+
+    def is_authorized_connection(
+        self,
+        *,
+        method: ResourceMethod,
+        connection_details: ConnectionDetails | None = None,
+        user: BaseUser | None = None,
+    ) -> bool:
+        return self._is_authorized(method=method, 
resource_type=RESOURCE_CONNECTION, user=user)
+
+    def is_authorized_dag(
+        self,
+        *,
+        method: ResourceMethod,
+        dag_access_entity: DagAccessEntity | None = None,
+        dag_details: DagDetails | None = None,
+        user: BaseUser | None = None,
+    ) -> bool:
+        """
+        Return whether the user is authorized to access the dag.
+
+        There are multiple scenarios:
+
+        1. ``dag_access`` is not provided which means the user wants to access 
the DAG itself and not a sub
+        entity (e.g. DAG runs).
+        2. ``dag_access`` is provided which means the user wants to access a 
sub entity of the DAG
+        (e.g. DAG runs).
+            a. If ``method`` is GET, then check the user has READ permissions 
on the DAG and the sub entity
+            b. Else, check the user has EDIT permissions on the DAG and 
``method`` on the sub entity
+
+        :param method: The method to authorize.
+        :param dag_access_entity: The dag access entity.
+        :param dag_details: The dag details.
+        :param user: The user.
+        """
+        if not dag_access_entity:
+            # Scenario 1
+            return self._is_authorized_dag(method=method, 
dag_details=dag_details, user=user)
+        else:
+            # Scenario 2
+            resource_type = self._get_fab_resource_type(dag_access_entity)
+            dag_method = ResourceMethod.GET if method == ResourceMethod.GET 
else ResourceMethod.PUT
+
+            return self._is_authorized_dag(
+                method=dag_method, dag_details=dag_details, user=user
+            ) and self._is_authorized(method=method, 
resource_type=resource_type, user=user)
+
+    def is_authorized_dataset(self, *, method: ResourceMethod, user: BaseUser 
| None = None) -> bool:
+        return self._is_authorized(method=method, 
resource_type=RESOURCE_DATASET, user=user)
+
+    def is_authorized_variable(self, *, method: ResourceMethod, user: BaseUser 
| None = None) -> bool:
+        return self._is_authorized(method=method, 
resource_type=RESOURCE_VARIABLE, user=user)
+
+    def is_authorized_website(self, *, user: BaseUser | None = None) -> bool:
+        return self._is_authorized(method=ResourceMethod.GET, 
resource_type=RESOURCE_WEBSITE, user=user)
+
     def get_security_manager_override_class(self) -> type:
         """Return the security manager override."""
         from airflow.auth.managers.fab.security_manager.override import 
FabAirflowSecurityManagerOverride
 
         return FabAirflowSecurityManagerOverride
 
-    def url_for(self, *args, **kwargs):
-        """Wrapper to allow mocking without having to import at the top of the 
file."""
-        from flask import url_for
-
-        return url_for(*args, **kwargs)
-
     def get_url_login(self, **kwargs) -> str:
         """Return the login page url."""
         if not self.security_manager.auth_view:
             raise AirflowException("`auth_view` not defined in the security 
manager.")
         if "next_url" in kwargs and kwargs["next_url"]:
-            return 
self.url_for(f"{self.security_manager.auth_view.endpoint}.login", 
next=kwargs["next_url"])
+            return 
self._url_for(f"{self.security_manager.auth_view.endpoint}.login", 
next=kwargs["next_url"])
         else:
-            return 
self.url_for(f"{self.security_manager.auth_view.endpoint}.login")
+            return 
self._url_for(f"{self.security_manager.auth_view.endpoint}.login")
 
     def get_url_logout(self):
         """Return the logout page url."""
         if not self.security_manager.auth_view:
             raise AirflowException("`auth_view` not defined in the security 
manager.")
-        return 
self.url_for(f"{self.security_manager.auth_view.endpoint}.logout")
+        return 
self._url_for(f"{self.security_manager.auth_view.endpoint}.logout")
 
     def get_url_user_profile(self) -> str | None:
         """Return the url to a page displaying info about the current user."""
         if not self.security_manager.user_view:
             return None
-        return 
self.url_for(f"{self.security_manager.user_view.endpoint}.userinfo")
+        return 
self._url_for(f"{self.security_manager.user_view.endpoint}.userinfo")
+
+    def _is_authorized(
+        self,
+        *,
+        method: ResourceMethod,
+        resource_type: str,
+        user: BaseUser | None = None,
+    ) -> bool:
+        """
+        Return whether the user is authorized to perform a given action.
+
+        :param method: the method to perform
+        :param resource_type: the type of resource the user attempts to 
perform the action on
+        :param user: the user to perform the action on. If not provided (or 
None), it uses the current user
+
+        :meta private:
+        """
+        if not user:
+            user = self.get_user()
+
+        fab_action = self._get_fab_action(method)
+        user_permissions = self._get_user_permissions(user)
+
+        return (fab_action, resource_type) in user_permissions
+
+    def _is_authorized_dag(
+        self,
+        method: ResourceMethod,
+        dag_details: DagDetails | None = None,
+        user: BaseUser | None = None,
+    ) -> bool:
+        """
+        Return whether the user is authorized to perform a given action on a 
DAG.
+
+        :param method: the method to perform
+        :param dag_details: optional details about the DAG
+        :param user: the user to perform the action on. If not provided (or 
None), it uses the current user
+
+        :meta private:
+        """
+        is_global_authorized = self._is_authorized(method=method, 
resource_type=RESOURCE_DAG, user=user)
+        if is_global_authorized:
+            return True
+
+        if dag_details and dag_details.id:
+            # Check whether the user has permissions to access a specific DAG
+            if not user:
+                user = self.get_user()
+            fab_action = self._get_fab_action(method)
+            user_permissions = self._get_user_permissions(user)
+            resource_dag_name = self._resource_name_for_dag(dag_details.id)
+            return (fab_action, resource_dag_name) in user_permissions
+
+        return False
+
+    @staticmethod
+    def _get_fab_action(method: ResourceMethod) -> str:
+        """
+        Convert the action to a FAB action.
+
+        :param method: the method to convert
+
+        :meta private:
+        """
+        if method not in _MAP_METHOD_NAME_TO_FAB_ACTION_NAME:
+            raise AirflowException(f"Unknown method: {method}")
+        return _MAP_METHOD_NAME_TO_FAB_ACTION_NAME[method]
+
+    @staticmethod
+    def _get_fab_resource_type(dag_access_entity: DagAccessEntity):
+        """
+        Convert a DAG access entity to a FAB resource type.
+
+        :param dag_access_entity: the DAG access entity
+
+        :meta private:
+        """
+        if dag_access_entity not in 
_MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE:
+            raise AirflowException(f"Unknown DAG access entity: 
{dag_access_entity}")
+        return _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE[dag_access_entity]
+
+    def _resource_name_for_dag(self, dag_id: str) -> str:
+        """
+        Returns the FAB resource name for a DAG id.
+
+        :param dag_id: the DAG id
+
+        :meta private:
+        """
+        root_dag_id = self._get_root_dag_id(dag_id)
+        if root_dag_id == RESOURCE_DAG:
+            return root_dag_id
+        if root_dag_id.startswith(RESOURCE_DAG_PREFIX):
+            return root_dag_id
+        return f"{RESOURCE_DAG_PREFIX}{root_dag_id}"
+
+    @staticmethod
+    def _get_user_permissions(user: BaseUser):
+        """
+        Return the user permissions.
+
+        ACTION_CAN_READ and ACTION_CAN_ACCESS_MENU are merged into because 
they are very similar.
+        We can assume that if a user has permissions to read variables, they 
also have permissions to access
+        the menu "Variables".
+
+        :param user: the user to get permissions for
+
+        :meta private:
+        """
+        return [
+            (ACTION_CAN_READ if perm[0] == ACTION_CAN_ACCESS_MENU else 
perm[0], perm[1])
+            for perm in user.perms
+        ]
+
+    def _get_root_dag_id(self, dag_id: str) -> str:
+        """
+        Return the root DAG id in case of sub DAG, return the DAG id otherwise.
+
+        :param dag_id: the DAG id
+
+        :meta private:
+        """
+        if "." in dag_id:
+            dm = (
+                
self.security_manager.appbuilder.get_session.query(DagModel.dag_id, 
DagModel.root_dag_id)
+                .filter(DagModel.dag_id == dag_id)
+                .first()
+            )
+            return dm.root_dag_id or dm.dag_id
+        return dag_id
+
+    @staticmethod
+    def _url_for(*args, **kwargs):
+        """
+        Wrapper to allow mocking without having to import at the top of the 
file.
+
+        :meta private:
+        """
+        from flask import url_for
+
+        return url_for(*args, **kwargs)

Review Comment:
   I believe this can be condensed into
   
   ```python
   class FabAuthManager(...):
       from flask import url_for as _url_for
       """Docstring here..."""
   ```
   
   while being equally mockable. The difference is minimal though…



##########
airflow/auth/managers/fab/fab_auth_manager.py:
##########
@@ -87,35 +131,231 @@ def is_logged_in(self) -> bool:
         """Return whether the user is logged in."""
         return not self.get_user().is_anonymous
 
+    def is_authorized_configuration(self, *, method: ResourceMethod, user: 
BaseUser | None = None) -> bool:
+        return self._is_authorized(method=method, 
resource_type=RESOURCE_CONFIG, user=user)
+
+    def is_authorized_cluster_activity(self, *, method: ResourceMethod, user: 
BaseUser | None = None) -> bool:
+        return self._is_authorized(method=method, 
resource_type=RESOURCE_CLUSTER_ACTIVITY, user=user)
+
+    def is_authorized_connection(
+        self,
+        *,
+        method: ResourceMethod,
+        connection_details: ConnectionDetails | None = None,
+        user: BaseUser | None = None,
+    ) -> bool:
+        return self._is_authorized(method=method, 
resource_type=RESOURCE_CONNECTION, user=user)
+
+    def is_authorized_dag(
+        self,
+        *,
+        method: ResourceMethod,
+        dag_access_entity: DagAccessEntity | None = None,
+        dag_details: DagDetails | None = None,
+        user: BaseUser | None = None,
+    ) -> bool:
+        """
+        Return whether the user is authorized to access the dag.
+
+        There are multiple scenarios:
+
+        1. ``dag_access`` is not provided which means the user wants to access 
the DAG itself and not a sub
+        entity (e.g. DAG runs).
+        2. ``dag_access`` is provided which means the user wants to access a 
sub entity of the DAG
+        (e.g. DAG runs).
+            a. If ``method`` is GET, then check the user has READ permissions 
on the DAG and the sub entity
+            b. Else, check the user has EDIT permissions on the DAG and 
``method`` on the sub entity
+
+        :param method: The method to authorize.
+        :param dag_access_entity: The dag access entity.
+        :param dag_details: The dag details.
+        :param user: The user.
+        """
+        if not dag_access_entity:
+            # Scenario 1
+            return self._is_authorized_dag(method=method, 
dag_details=dag_details, user=user)
+        else:
+            # Scenario 2
+            resource_type = self._get_fab_resource_type(dag_access_entity)
+            dag_method = ResourceMethod.GET if method == ResourceMethod.GET 
else ResourceMethod.PUT
+
+            return self._is_authorized_dag(
+                method=dag_method, dag_details=dag_details, user=user
+            ) and self._is_authorized(method=method, 
resource_type=resource_type, user=user)
+
+    def is_authorized_dataset(self, *, method: ResourceMethod, user: BaseUser 
| None = None) -> bool:
+        return self._is_authorized(method=method, 
resource_type=RESOURCE_DATASET, user=user)
+
+    def is_authorized_variable(self, *, method: ResourceMethod, user: BaseUser 
| None = None) -> bool:
+        return self._is_authorized(method=method, 
resource_type=RESOURCE_VARIABLE, user=user)
+
+    def is_authorized_website(self, *, user: BaseUser | None = None) -> bool:
+        return self._is_authorized(method=ResourceMethod.GET, 
resource_type=RESOURCE_WEBSITE, user=user)
+
     def get_security_manager_override_class(self) -> type:
         """Return the security manager override."""
         from airflow.auth.managers.fab.security_manager.override import 
FabAirflowSecurityManagerOverride
 
         return FabAirflowSecurityManagerOverride
 
-    def url_for(self, *args, **kwargs):
-        """Wrapper to allow mocking without having to import at the top of the 
file."""
-        from flask import url_for
-
-        return url_for(*args, **kwargs)
-
     def get_url_login(self, **kwargs) -> str:
         """Return the login page url."""
         if not self.security_manager.auth_view:
             raise AirflowException("`auth_view` not defined in the security 
manager.")
         if "next_url" in kwargs and kwargs["next_url"]:
-            return 
self.url_for(f"{self.security_manager.auth_view.endpoint}.login", 
next=kwargs["next_url"])
+            return 
self._url_for(f"{self.security_manager.auth_view.endpoint}.login", 
next=kwargs["next_url"])
         else:
-            return 
self.url_for(f"{self.security_manager.auth_view.endpoint}.login")
+            return 
self._url_for(f"{self.security_manager.auth_view.endpoint}.login")
 
     def get_url_logout(self):
         """Return the logout page url."""
         if not self.security_manager.auth_view:
             raise AirflowException("`auth_view` not defined in the security 
manager.")
-        return 
self.url_for(f"{self.security_manager.auth_view.endpoint}.logout")
+        return 
self._url_for(f"{self.security_manager.auth_view.endpoint}.logout")
 
     def get_url_user_profile(self) -> str | None:
         """Return the url to a page displaying info about the current user."""
         if not self.security_manager.user_view:
             return None
-        return 
self.url_for(f"{self.security_manager.user_view.endpoint}.userinfo")
+        return 
self._url_for(f"{self.security_manager.user_view.endpoint}.userinfo")
+
+    def _is_authorized(
+        self,
+        *,
+        method: ResourceMethod,
+        resource_type: str,
+        user: BaseUser | None = None,
+    ) -> bool:
+        """
+        Return whether the user is authorized to perform a given action.
+
+        :param method: the method to perform
+        :param resource_type: the type of resource the user attempts to 
perform the action on
+        :param user: the user to perform the action on. If not provided (or 
None), it uses the current user
+
+        :meta private:
+        """
+        if not user:
+            user = self.get_user()
+
+        fab_action = self._get_fab_action(method)
+        user_permissions = self._get_user_permissions(user)
+
+        return (fab_action, resource_type) in user_permissions
+
+    def _is_authorized_dag(
+        self,
+        method: ResourceMethod,
+        dag_details: DagDetails | None = None,
+        user: BaseUser | None = None,
+    ) -> bool:
+        """
+        Return whether the user is authorized to perform a given action on a 
DAG.
+
+        :param method: the method to perform
+        :param dag_details: optional details about the DAG
+        :param user: the user to perform the action on. If not provided (or 
None), it uses the current user
+
+        :meta private:
+        """
+        is_global_authorized = self._is_authorized(method=method, 
resource_type=RESOURCE_DAG, user=user)
+        if is_global_authorized:
+            return True
+
+        if dag_details and dag_details.id:
+            # Check whether the user has permissions to access a specific DAG
+            if not user:
+                user = self.get_user()
+            fab_action = self._get_fab_action(method)
+            user_permissions = self._get_user_permissions(user)
+            resource_dag_name = self._resource_name_for_dag(dag_details.id)
+            return (fab_action, resource_dag_name) in user_permissions
+
+        return False
+
+    @staticmethod
+    def _get_fab_action(method: ResourceMethod) -> str:
+        """
+        Convert the action to a FAB action.
+
+        :param method: the method to convert
+
+        :meta private:
+        """
+        if method not in _MAP_METHOD_NAME_TO_FAB_ACTION_NAME:
+            raise AirflowException(f"Unknown method: {method}")
+        return _MAP_METHOD_NAME_TO_FAB_ACTION_NAME[method]
+
+    @staticmethod
+    def _get_fab_resource_type(dag_access_entity: DagAccessEntity):
+        """
+        Convert a DAG access entity to a FAB resource type.
+
+        :param dag_access_entity: the DAG access entity
+
+        :meta private:
+        """
+        if dag_access_entity not in 
_MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE:
+            raise AirflowException(f"Unknown DAG access entity: 
{dag_access_entity}")
+        return _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE[dag_access_entity]
+
+    def _resource_name_for_dag(self, dag_id: str) -> str:
+        """
+        Returns the FAB resource name for a DAG id.
+
+        :param dag_id: the DAG id
+
+        :meta private:
+        """
+        root_dag_id = self._get_root_dag_id(dag_id)
+        if root_dag_id == RESOURCE_DAG:
+            return root_dag_id
+        if root_dag_id.startswith(RESOURCE_DAG_PREFIX):
+            return root_dag_id
+        return f"{RESOURCE_DAG_PREFIX}{root_dag_id}"
+
+    @staticmethod
+    def _get_user_permissions(user: BaseUser):
+        """
+        Return the user permissions.
+
+        ACTION_CAN_READ and ACTION_CAN_ACCESS_MENU are merged into because 
they are very similar.
+        We can assume that if a user has permissions to read variables, they 
also have permissions to access
+        the menu "Variables".
+
+        :param user: the user to get permissions for
+
+        :meta private:
+        """
+        return [
+            (ACTION_CAN_READ if perm[0] == ACTION_CAN_ACCESS_MENU else 
perm[0], perm[1])
+            for perm in user.perms
+        ]
+
+    def _get_root_dag_id(self, dag_id: str) -> str:
+        """
+        Return the root DAG id in case of sub DAG, return the DAG id otherwise.
+
+        :param dag_id: the DAG id
+
+        :meta private:
+        """
+        if "." in dag_id:
+            dm = (
+                
self.security_manager.appbuilder.get_session.query(DagModel.dag_id, 
DagModel.root_dag_id)
+                .filter(DagModel.dag_id == dag_id)
+                .first()
+            )
+            return dm.root_dag_id or dm.dag_id
+        return dag_id
+
+    @staticmethod
+    def _url_for(*args, **kwargs):
+        """
+        Wrapper to allow mocking without having to import at the top of the 
file.
+
+        :meta private:
+        """
+        from flask import url_for
+
+        return url_for(*args, **kwargs)

Review Comment:
   I believe this can be condensed into
   
   ```python
   class FabAuthManager(...):
       from flask import url_for as _url_for
       """Docstring here..."""
   ```
   
   while being equally mockable. The difference is minimal though…



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to