This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 28e3d65 Add per-DAG delete permissions (#21938)
28e3d65 is described below
commit 28e3d6599aed3f8167477d16c842d7aa1e5c1d4d
Author: blag <[email protected]>
AuthorDate: Fri Mar 4 13:19:08 2022 -0700
Add per-DAG delete permissions (#21938)
This PR adds per-DAG delete permissions and extends the sync-perms
subcommand to add delete permissions to the database for all existing DAGs (it
does not, however, grant any of the new DAG delete permissions to any roles).
---
airflow/models/dag.py | 2 +-
airflow/security/permissions.py | 2 +-
airflow/www/security.py | 12 ++++++---
airflow/www/views.py | 14 +++++-----
tests/www/views/test_views_acl.py | 3 ++-
tests/www/views/test_views_tasks.py | 52 ++++++++++++++++++++++++++++++++++++-
6 files changed, 71 insertions(+), 14 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 23eb0ce..a78193a 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -254,7 +254,7 @@ class DAG(LoggingMixin):
:param on_success_callback: Much like the ``on_failure_callback`` except
that it is executed when the dag succeeds.
:param access_control: Specify optional DAG-level actions, e.g.,
- "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}"
+ "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit',
'can_delete'}}"
:param is_paused_upon_creation: Specifies if the dag is paused when
created for the first time.
If the dag exists already, this flag will be ignored. If this optional
parameter
is not specified, the global config setting will be used.
diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py
index d3d8452..2d5c0b9 100644
--- a/airflow/security/permissions.py
+++ b/airflow/security/permissions.py
@@ -63,7 +63,7 @@ ACTION_CAN_ACCESS_MENU = "menu_access"
DEPRECATED_ACTION_CAN_DAG_READ = "can_dag_read"
DEPRECATED_ACTION_CAN_DAG_EDIT = "can_dag_edit"
-DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT}
+DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE}
def resource_name_for_dag(dag_id):
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 7240237..e53513b 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -325,6 +325,11 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
dag_resource_name = permissions.resource_name_for_dag(dag_id)
return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name,
user=user)
+ def can_delete_dag(self, dag_id, user=None) -> bool:
+ """Determines whether a user has DAG delete access."""
+ dag_resource_name = permissions.resource_name_for_dag(dag_id)
+ return self.has_access(permissions.ACTION_CAN_DELETE,
dag_resource_name, user=user)
+
def prefixed_dag_id(self, dag_id):
"""Returns the permission name for a DAG id."""
warnings.warn(
@@ -344,7 +349,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
def has_access(self, action_name, resource_name, user=None) -> bool:
"""
Verify whether a given user could perform a certain action
- (e.g can_read, can_write) on the given resource.
+ (e.g can_read, can_write, can_delete) on the given resource.
:param action_name: action_name on resource (e.g can_read, can_edit).
:param resource_name: name of view-menu or resource.
@@ -479,8 +484,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
def create_dag_specific_permissions(self) -> None:
"""
- Creates 'can_read' and 'can_edit' permissions for all DAGs,
- along with any `access_control` permissions provided in them.
+ Creates 'can_read', 'can_edit', and 'can_delete' permissions for all
+ DAGs, along with any `access_control` permissions provided in them.
This does iterate through ALL the DAGs, which can be slow. See
`sync_perm_for_dag`
if you only need to sync a single DAG.
@@ -647,6 +652,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
if perm in (
(permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
(permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG),
):
can_access_all_dags = self.has_access(*perm)
if can_access_all_dags:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 647adec..64c6f4b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -612,10 +612,7 @@ def add_user_permissions_to_dag(sender, template, context,
**extra):
dag.can_edit = current_app.appbuilder.sm.can_edit_dag(dag.dag_id)
dag.can_trigger = dag.can_edit and can_create_dag_run
- dag.can_delete = current_app.appbuilder.sm.has_access(
- permissions.ACTION_CAN_DELETE,
- permissions.RESOURCE_DAG,
- )
+ dag.can_delete = current_app.appbuilder.sm.can_delete_dag(dag.dag_id)
context['dag'] = dag
@@ -753,19 +750,22 @@ class Airflow(AirflowBaseView):
permissions.RESOURCE_DAG_RUN,
) in user_permissions
- can_delete_dag = (
+ all_dags_deletable = (
permissions.ACTION_CAN_DELETE,
permissions.RESOURCE_DAG,
) in user_permissions
for dag in dags:
+ dag_resource_name = permissions.RESOURCE_DAG_PREFIX +
dag.dag_id
if all_dags_editable:
dag.can_edit = True
else:
- dag_resource_name = permissions.RESOURCE_DAG_PREFIX +
dag.dag_id
dag.can_edit = (permissions.ACTION_CAN_EDIT,
dag_resource_name) in user_permissions
dag.can_trigger = dag.can_edit and can_create_dag_run
- dag.can_delete = can_delete_dag
+ if all_dags_deletable:
+ dag.can_delete = True
+ else:
+ dag.can_delete = (permissions.ACTION_CAN_DELETE,
dag_resource_name) in user_permissions
dagtags = session.query(DagTag.name).distinct(DagTag.name).all()
tags = [
diff --git a/tests/www/views/test_views_acl.py
b/tests/www/views/test_views_acl.py
index 53c45b9..a37d384 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -190,11 +190,12 @@ def test_permission_exist(acl_app):
perms_views = acl_app.appbuilder.sm.get_resource_permissions(
acl_app.appbuilder.sm.get_resource('DAG:example_bash_operator'),
)
- assert len(perms_views) == 2
+ assert len(perms_views) == 3
perms = {str(perm) for perm in perms_views}
assert "can read on DAG:example_bash_operator" in perms
assert "can edit on DAG:example_bash_operator" in perms
+ assert "can delete on DAG:example_bash_operator" in perms
@pytest.mark.usefixtures("user_edit_one_dag")
diff --git a/tests/www/views/test_views_tasks.py
b/tests/www/views/test_views_tasks.py
index bd0c7ba..1b071fd 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -25,7 +25,7 @@ import pytest
from airflow import settings
from airflow.executors.celery_executor import CeleryExecutor
-from airflow.models import DagBag, DagModel, TaskInstance, TaskReschedule
+from airflow.models import DAG, DagBag, DagModel, TaskInstance, TaskReschedule
from airflow.models.dagcode import DagCode
from airflow.security import permissions
from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES,
RUNNABLE_STATES
@@ -602,6 +602,56 @@ def
test_delete_dag_button_for_dag_on_scheduler_only(admin_client, new_id_exampl
check_content_in_response(f"return confirmDeleteDag(this,
'{test_dag_id}')", resp)
[email protected]()
+def new_dag_to_delete():
+ dag = DAG('new_dag_to_delete', is_paused_upon_creation=True)
+ session = settings.Session()
+ dag.sync_to_db(session=session)
+ return dag
+
+
[email protected]()
+def per_dag_perm_user_client(app, new_dag_to_delete):
+ sm = app.appbuilder.sm
+ perm = f"{permissions.RESOURCE_DAG_PREFIX}{new_dag_to_delete.dag_id}"
+
+ sm.create_permission(permissions.ACTION_CAN_DELETE, perm)
+
+ create_user(
+ app,
+ username="test_user_per_dag_perms",
+ role_name="User with some perms",
+ permissions=[
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+ (permissions.ACTION_CAN_DELETE, perm),
+ (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
+ ],
+ )
+
+ sm.find_user(username="test_user_per_dag_perms")
+
+ yield client_with_login(
+ app,
+ username="test_user_per_dag_perms",
+ password="test_user_per_dag_perms",
+ )
+
+ delete_user(app, username="test_user_per_dag_perms") # type: ignore
+ delete_roles(app)
+
+
+def test_delete_just_dag_per_dag_permissions(new_dag_to_delete,
per_dag_perm_user_client):
+ resp = per_dag_perm_user_client.post(
+ f"delete?dag_id={new_dag_to_delete.dag_id}&next=/home",
follow_redirects=True
+ )
+ check_content_in_response(f'Deleting DAG with id
{new_dag_to_delete.dag_id}.', resp)
+
+
+def test_delete_just_dag_resource_permissions(new_dag_to_delete, user_client):
+ resp =
user_client.post(f"delete?dag_id={new_dag_to_delete.dag_id}&next=/home",
follow_redirects=True)
+ check_content_in_response(f'Deleting DAG with id
{new_dag_to_delete.dag_id}.', resp)
+
+
@pytest.mark.parametrize("endpoint", ["graph", "tree"])
def
test_show_external_log_redirect_link_with_local_log_handler(capture_templates,
admin_client, endpoint):
"""Do not show external links if log handler is local."""