potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355994191
##########
airflow/www/security_manager.py:
##########
@@ -269,125 +272,47 @@ def get_user_roles(user=None):
user = g.user
return user.roles
- def get_readable_dags(self, user) -> Iterable[DagModel]:
- """Gets the DAGs readable by authenticated user."""
- warnings.warn(
- "`get_readable_dags` has been deprecated. Please use
`get_readable_dag_ids` instead.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", RemovedInAirflow3Warning)
- return self.get_accessible_dags([permissions.ACTION_CAN_READ],
user)
-
- def get_editable_dags(self, user) -> Iterable[DagModel]:
- """Gets the DAGs editable by authenticated user."""
- warnings.warn(
- "`get_editable_dags` has been deprecated. Please use
`get_editable_dag_ids` instead.",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", RemovedInAirflow3Warning)
- return self.get_accessible_dags([permissions.ACTION_CAN_EDIT],
user)
-
- @provide_session
- def get_accessible_dags(
- self,
- user_actions: Container[str] | None,
- user,
- session: Session = NEW_SESSION,
- ) -> Iterable[DagModel]:
- warnings.warn(
- "`get_accessible_dags` has been deprecated. Please use
`get_accessible_dag_ids` instead.",
- RemovedInAirflow3Warning,
- stacklevel=3,
- )
- dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
- return
session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
- def get_readable_dag_ids(self, user) -> set[str]:
+ def get_readable_dag_ids(self, user=None) -> set[str]:
"""Gets the DAG IDs readable by authenticated user."""
- return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+ return self.get_permitted_dag_ids(methods=["GET"], user=user)
- def get_editable_dag_ids(self, user) -> set[str]:
+ def get_editable_dag_ids(self, user=None) -> set[str]:
"""Gets the DAG IDs editable by authenticated user."""
- return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+ return self.get_permitted_dag_ids(methods=["PUT"], user=user)
@provide_session
- def get_accessible_dag_ids(
+ def get_permitted_dag_ids(
self,
- user,
- user_actions: Container[str] | None = None,
+ *,
+ methods: Container[ResourceMethod] | None = None,
+ user=None,
session: Session = NEW_SESSION,
) -> set[str]:
"""Generic function to get readable or writable DAGs for user."""
- if not user_actions:
- user_actions = [permissions.ACTION_CAN_EDIT,
permissions.ACTION_CAN_READ]
+ if not methods:
+ methods = ["PUT", "GET"]
- if not get_auth_manager().is_logged_in():
- roles = user.roles
- else:
- if (permissions.ACTION_CAN_EDIT in user_actions and
self.can_edit_all_dags(user)) or (
- permissions.ACTION_CAN_READ in user_actions and
self.can_read_all_dags(user)
- ):
- return {dag.dag_id for dag in
session.execute(select(DagModel.dag_id))}
- user_query = session.scalar(
- select(User)
- .options(
- joinedload(User.roles)
- .subqueryload(Role.permissions)
- .options(joinedload(Permission.action),
joinedload(Permission.resource))
- )
- .where(User.id == user.id)
- )
- roles = user_query.roles
-
- resources = set()
- for role in roles:
- for permission in role.permissions:
- action = permission.action.name
- if action in user_actions:
- resource = permission.resource.name
- if resource == permissions.RESOURCE_DAG:
- return {dag.dag_id for dag in
session.execute(select(DagModel.dag_id))}
- if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
-
resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
- else:
- resources.add(resource)
- return {
- dag.dag_id
- for dag in
session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
- }
-
- def can_access_some_dags(self, action: str, dag_id: str | None = None) ->
bool:
- """Checks if user has read or write access to some dags."""
- if dag_id and dag_id != "~":
- root_dag_id = self._get_root_dag_id(dag_id)
- return self.has_access(action,
permissions.resource_name_for_dag(root_dag_id))
+ dag_ids = {dag.dag_id for dag in
session.execute(select(DagModel.dag_id))}
- user = g.user
- if action == permissions.ACTION_CAN_READ:
- return any(self.get_readable_dag_ids(user))
- return any(self.get_editable_dag_ids(user))
+ if ("GET" in methods and
get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+ "PUT" in methods and
get_auth_manager().is_authorized_dag(method="PUT", user=user)
+ ):
+ return dag_ids
- def can_read_dag(self, dag_id: str, user=None) -> bool:
- """Determines whether a user has DAG read access."""
- root_dag_id = self._get_root_dag_id(dag_id)
- dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
- return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name,
user=user)
+ def _is_permitted_dag_id(method: ResourceMethod, methods:
Container[ResourceMethod], dag_id: str):
+ return method in methods and get_auth_manager().is_authorized_dag(
+ method=method, details=DagDetails(id=dag_id), user=user
+ )
- def can_edit_dag(self, dag_id: str, user=None) -> bool:
- """Determines whether a user has DAG edit access."""
- root_dag_id = self._get_root_dag_id(dag_id)
- dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
- return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name,
user=user)
+ return {
Review Comment:
I think if we leave it like this, it might incur a performance hit for FAB
users who have only access to some dags. In the current implementation reading
all dags as set is only done when the user has permission to read all dags in
the role.
Also it hampers any future multi-tenant implementation of auth manager,
because basically it will not allow to optimize this via auth_manager.
Basically for all users or all tenants, they will have to read all dag_ids to
memory always when they want to see DAGs page. This will become a problem very
quickly for any multi-tenant environment.
I **think** the solution for that should be different. Rather than
implementing all the loop here, the "get_permitted_dag_ids" should be
implemented in the `auth_manager`.
Here we can do some basic checks for all dags, but then getting the list of
allowed dags should be delegated to auth_manager.
```python
if ("GET" in methods and
get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
"PUT" in methods and
get_auth_manager().is_authorized_dag(method="PUT", user=user)
):
return {dag.dag_id for dag in
session.execute(select(DagModel.dag_id))}
return get_auth_manager().get_permitted_dag_ids(methods=methods,
user=user)
```
Then each auth manager could implement an optimized version of the
`get_permitted_dag_ids`. For FAB, it will be the original implementation with
resources, but for others it could be different. Conceptually this for example:
```python
tenant = get_tenant(user)
return {dag.dag_id for dag in
session.execute(select(DagModel.dag_id).filter_by(tenant=tenant)}
```
This is no problem for auth manager to have two ways of checking permissions:
1) when you have dag_id - you can check it individually
2) when you get the list you have a method that returns the list based on
same criteria but implemented without using the individual check (bulk).
This is the only way to implement performant version of such permission
check IMHO.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]