This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 28e3d65  Add per-DAG delete permissions (#21938)
28e3d65 is described below

commit 28e3d6599aed3f8167477d16c842d7aa1e5c1d4d
Author: blag <[email protected]>
AuthorDate: Fri Mar 4 13:19:08 2022 -0700

    Add per-DAG delete permissions (#21938)
    
    This PR adds per-DAG delete permissions and extends the sync-perms 
subcommand to add delete permissions to the database for all existing DAGs (it 
does not, however, grant any of the new DAG delete permissions to any roles).
---
 airflow/models/dag.py               |  2 +-
 airflow/security/permissions.py     |  2 +-
 airflow/www/security.py             | 12 ++++++---
 airflow/www/views.py                | 14 +++++-----
 tests/www/views/test_views_acl.py   |  3 ++-
 tests/www/views/test_views_tasks.py | 52 ++++++++++++++++++++++++++++++++++++-
 6 files changed, 71 insertions(+), 14 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 23eb0ce..a78193a 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -254,7 +254,7 @@ class DAG(LoggingMixin):
     :param on_success_callback: Much like the ``on_failure_callback`` except
         that it is executed when the dag succeeds.
     :param access_control: Specify optional DAG-level actions, e.g.,
-        "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}"
+        "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 
'can_delete'}}"
     :param is_paused_upon_creation: Specifies if the dag is paused when 
created for the first time.
         If the dag exists already, this flag will be ignored. If this optional 
parameter
         is not specified, the global config setting will be used.
diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py
index d3d8452..2d5c0b9 100644
--- a/airflow/security/permissions.py
+++ b/airflow/security/permissions.py
@@ -63,7 +63,7 @@ ACTION_CAN_ACCESS_MENU = "menu_access"
 DEPRECATED_ACTION_CAN_DAG_READ = "can_dag_read"
 DEPRECATED_ACTION_CAN_DAG_EDIT = "can_dag_edit"
 
-DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT}
+DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE}
 
 
 def resource_name_for_dag(dag_id):
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 7240237..e53513b 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -325,6 +325,11 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
         dag_resource_name = permissions.resource_name_for_dag(dag_id)
         return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, 
user=user)
 
+    def can_delete_dag(self, dag_id, user=None) -> bool:
+        """Determines whether a user has DAG delete access."""
+        dag_resource_name = permissions.resource_name_for_dag(dag_id)
+        return self.has_access(permissions.ACTION_CAN_DELETE, 
dag_resource_name, user=user)
+
     def prefixed_dag_id(self, dag_id):
         """Returns the permission name for a DAG id."""
         warnings.warn(
@@ -344,7 +349,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
     def has_access(self, action_name, resource_name, user=None) -> bool:
         """
         Verify whether a given user could perform a certain action
-        (e.g can_read, can_write) on the given resource.
+        (e.g can_read, can_write, can_delete) on the given resource.
 
         :param action_name: action_name on resource (e.g can_read, can_edit).
         :param resource_name: name of view-menu or resource.
@@ -479,8 +484,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
 
     def create_dag_specific_permissions(self) -> None:
         """
-        Creates 'can_read' and 'can_edit' permissions for all DAGs,
-        along with any `access_control` permissions provided in them.
+        Creates 'can_read', 'can_edit', and 'can_delete' permissions for all
+        DAGs, along with any `access_control` permissions provided in them.
 
         This does iterate through ALL the DAGs, which can be slow. See 
`sync_perm_for_dag`
         if you only need to sync a single DAG.
@@ -647,6 +652,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
             if perm in (
                 (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
                 (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG),
+                (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG),
             ):
                 can_access_all_dags = self.has_access(*perm)
                 if can_access_all_dags:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 647adec..64c6f4b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -612,10 +612,7 @@ def add_user_permissions_to_dag(sender, template, context, 
**extra):
 
         dag.can_edit = current_app.appbuilder.sm.can_edit_dag(dag.dag_id)
         dag.can_trigger = dag.can_edit and can_create_dag_run
-        dag.can_delete = current_app.appbuilder.sm.has_access(
-            permissions.ACTION_CAN_DELETE,
-            permissions.RESOURCE_DAG,
-        )
+        dag.can_delete = current_app.appbuilder.sm.can_delete_dag(dag.dag_id)
         context['dag'] = dag
 
 
@@ -753,19 +750,22 @@ class Airflow(AirflowBaseView):
                 permissions.RESOURCE_DAG_RUN,
             ) in user_permissions
 
-            can_delete_dag = (
+            all_dags_deletable = (
                 permissions.ACTION_CAN_DELETE,
                 permissions.RESOURCE_DAG,
             ) in user_permissions
 
             for dag in dags:
+                dag_resource_name = permissions.RESOURCE_DAG_PREFIX + 
dag.dag_id
                 if all_dags_editable:
                     dag.can_edit = True
                 else:
-                    dag_resource_name = permissions.RESOURCE_DAG_PREFIX + 
dag.dag_id
                     dag.can_edit = (permissions.ACTION_CAN_EDIT, 
dag_resource_name) in user_permissions
                 dag.can_trigger = dag.can_edit and can_create_dag_run
-                dag.can_delete = can_delete_dag
+                if all_dags_deletable:
+                    dag.can_delete = True
+                else:
+                    dag.can_delete = (permissions.ACTION_CAN_DELETE, 
dag_resource_name) in user_permissions
 
             dagtags = session.query(DagTag.name).distinct(DagTag.name).all()
             tags = [
diff --git a/tests/www/views/test_views_acl.py 
b/tests/www/views/test_views_acl.py
index 53c45b9..a37d384 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -190,11 +190,12 @@ def test_permission_exist(acl_app):
     perms_views = acl_app.appbuilder.sm.get_resource_permissions(
         acl_app.appbuilder.sm.get_resource('DAG:example_bash_operator'),
     )
-    assert len(perms_views) == 2
+    assert len(perms_views) == 3
 
     perms = {str(perm) for perm in perms_views}
     assert "can read on DAG:example_bash_operator" in perms
     assert "can edit on DAG:example_bash_operator" in perms
+    assert "can delete on DAG:example_bash_operator" in perms
 
 
 @pytest.mark.usefixtures("user_edit_one_dag")
diff --git a/tests/www/views/test_views_tasks.py 
b/tests/www/views/test_views_tasks.py
index bd0c7ba..1b071fd 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -25,7 +25,7 @@ import pytest
 
 from airflow import settings
 from airflow.executors.celery_executor import CeleryExecutor
-from airflow.models import DagBag, DagModel, TaskInstance, TaskReschedule
+from airflow.models import DAG, DagBag, DagModel, TaskInstance, TaskReschedule
 from airflow.models.dagcode import DagCode
 from airflow.security import permissions
 from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, 
RUNNABLE_STATES
@@ -602,6 +602,56 @@ def 
test_delete_dag_button_for_dag_on_scheduler_only(admin_client, new_id_exampl
     check_content_in_response(f"return confirmDeleteDag(this, 
'{test_dag_id}')", resp)
 
 
[email protected]()
+def new_dag_to_delete():
+    dag = DAG('new_dag_to_delete', is_paused_upon_creation=True)
+    session = settings.Session()
+    dag.sync_to_db(session=session)
+    return dag
+
+
[email protected]()
+def per_dag_perm_user_client(app, new_dag_to_delete):
+    sm = app.appbuilder.sm
+    perm = f"{permissions.RESOURCE_DAG_PREFIX}{new_dag_to_delete.dag_id}"
+
+    sm.create_permission(permissions.ACTION_CAN_DELETE, perm)
+
+    create_user(
+        app,
+        username="test_user_per_dag_perms",
+        role_name="User with some perms",
+        permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_DELETE, perm),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
+        ],
+    )
+
+    sm.find_user(username="test_user_per_dag_perms")
+
+    yield client_with_login(
+        app,
+        username="test_user_per_dag_perms",
+        password="test_user_per_dag_perms",
+    )
+
+    delete_user(app, username="test_user_per_dag_perms")  # type: ignore
+    delete_roles(app)
+
+
+def test_delete_just_dag_per_dag_permissions(new_dag_to_delete, 
per_dag_perm_user_client):
+    resp = per_dag_perm_user_client.post(
+        f"delete?dag_id={new_dag_to_delete.dag_id}&next=/home", 
follow_redirects=True
+    )
+    check_content_in_response(f'Deleting DAG with id 
{new_dag_to_delete.dag_id}.', resp)
+
+
+def test_delete_just_dag_resource_permissions(new_dag_to_delete, user_client):
+    resp = 
user_client.post(f"delete?dag_id={new_dag_to_delete.dag_id}&next=/home", 
follow_redirects=True)
+    check_content_in_response(f'Deleting DAG with id 
{new_dag_to_delete.dag_id}.', resp)
+
+
 @pytest.mark.parametrize("endpoint", ["graph", "tree"])
 def 
test_show_external_log_redirect_link_with_local_log_handler(capture_templates, 
admin_client, endpoint):
     """Do not show external links if log handler is local."""

Reply via email to