This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e7e2462f1b12b1a1804bfa781ebed9c0b8d15ac8
Author: Huy Mạc Quang <[email protected]>
AuthorDate: Wed Apr 26 21:11:14 2023 +0700

    Fix DAG.access_control can't sync when clean access_control (#30340)
    
    * Reset permission if `access_control` is empty
    
    * Check `resource` before call `_revoke_all_stale_permissions`
    
    * Fix static checks
    
    (cherry picked from commit 2c0c8b8bfb5287e10dc40b73f326bbf9a0437bb1)
---
 airflow/models/dagbag.py           | 28 ++++----------------
 airflow/www/security.py            | 17 ++++++++++++
 tests/models/test_dagbag.py        |  3 +--
 tests/www/views/test_views_home.py | 54 +++++++++++++++++++++++++++++++++++---
 4 files changed, 74 insertions(+), 28 deletions(-)

diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 3d00ac973f..305009e2ea 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -689,29 +689,11 @@ class DagBag(LoggingMixin):
     @classmethod
     @provide_session
     def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION):
-        """Sync DAG specific permissions, if necessary"""
-        from airflow.security.permissions import DAG_ACTIONS, 
resource_name_for_dag
-        from airflow.www.fab_security.sqla.models import Action, Permission, 
Resource
-
+        """Sync DAG specific permissions"""
         root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
 
-        def needs_perms(dag_id: str) -> bool:
-            dag_resource_name = resource_name_for_dag(dag_id)
-            for permission_name in DAG_ACTIONS:
-                if not (
-                    session.query(Permission)
-                    .join(Action)
-                    .join(Resource)
-                    .filter(Action.name == permission_name)
-                    .filter(Resource.name == dag_resource_name)
-                    .one_or_none()
-                ):
-                    return True
-            return False
-
-        if dag.access_control or needs_perms(root_dag_id):
-            cls.logger().debug("Syncing DAG permissions: %s to the DB", 
root_dag_id)
-            from airflow.www.security import ApplessAirflowSecurityManager
+        cls.logger().debug("Syncing DAG permissions: %s to the DB", 
root_dag_id)
+        from airflow.www.security import ApplessAirflowSecurityManager
 
-            security_manager = ApplessAirflowSecurityManager(session=session)
-            security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
+        security_manager = ApplessAirflowSecurityManager(session=session)
+        security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 201c9ada0a..50c484082c 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -655,8 +655,25 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
         for dag_action_name in self.DAG_ACTIONS:
             self.create_permission(dag_action_name, dag_resource_name)
 
+        def _revoke_all_stale_permissions(resource: Resource):
+            existing_dag_perms = self.get_resource_permissions(resource)
+            for perm in existing_dag_perms:
+                non_admin_roles = [role for role in perm.role if role.name != 
"Admin"]
+                for role in non_admin_roles:
+                    self.log.info(
+                        "Revoking '%s' on DAG '%s' for role '%s'",
+                        perm.action,
+                        dag_resource_name,
+                        role.name,
+                    )
+                    self.remove_permission_from_role(role, perm)
+
         if access_control:
             self._sync_dag_view_permissions(dag_resource_name, access_control)
+        else:
+            resource = self.get_resource(dag_resource_name)
+            if resource:
+                _revoke_all_stale_permissions(resource)
 
     def _sync_dag_view_permissions(self, dag_id: str, access_control: 
dict[str, Collection[str]]) -> None:
         """
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index fd4d5bdc82..22ff525bb8 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -908,7 +908,6 @@ class TestDagBag:
     def test_sync_perm_for_dag(self, mock_security_manager):
         """
         Test that dagbag._sync_perm_for_dag will call 
ApplessAirflowSecurityManager.sync_perm_for_dag
-        when DAG specific perm views don't exist already or the DAG has 
access_control set.
         """
         db_clean_up()
         with create_session() as session:
@@ -932,7 +931,7 @@ class TestDagBag:
 
             # perms now exist
             _sync_perms()
-            mock_sync_perm_for_dag.assert_not_called()
+            
mock_sync_perm_for_dag.assert_called_once_with("test_example_bash_operator", 
None)
 
             # Always sync if we have access_control
             dag.access_control = {"Public": {"can_read"}}
diff --git a/tests/www/views/test_views_home.py 
b/tests/www/views/test_views_home.py
index 6619bcdf71..045e34b002 100644
--- a/tests/www/views/test_views_home.py
+++ b/tests/www/views/test_views_home.py
@@ -155,6 +155,44 @@ def working_dags(tmpdir):
             _process_file(filename, session)
 
 
[email protected]()
+def working_dags_with_read_perm(tmpdir):
+    dag_contents_template = "from airflow import DAG\ndag = DAG('{}', 
tags=['{}'])"
+    dag_contents_template_with_read_perm = (
+        "from airflow import DAG\ndag = DAG('{}', tags=['{}'], "
+        "access_control={{'role_single_dag':{{'can_read'}}}}) "
+    )
+    with create_session() as session:
+        for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)):
+            filename = os.path.join(tmpdir, f"{dag_id}.py")
+            if dag_id == "filter_test_1":
+                with open(filename, "w") as f:
+                    
f.writelines(dag_contents_template_with_read_perm.format(dag_id, tag))
+            else:
+                with open(filename, "w") as f:
+                    f.writelines(dag_contents_template.format(dag_id, tag))
+            _process_file(filename, session)
+
+
[email protected]()
+def working_dags_with_edit_perm(tmpdir):
+    dag_contents_template = "from airflow import DAG\ndag = DAG('{}', 
tags=['{}'])"
+    dag_contents_template_with_read_perm = (
+        "from airflow import DAG\ndag = DAG('{}', tags=['{}'], "
+        "access_control={{'role_single_dag':{{'can_edit'}}}}) "
+    )
+    with create_session() as session:
+        for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)):
+            filename = os.path.join(tmpdir, f"{dag_id}.py")
+            if dag_id == "filter_test_1":
+                with open(filename, "w") as f:
+                    
f.writelines(dag_contents_template_with_read_perm.format(dag_id, tag))
+            else:
+                with open(filename, "w") as f:
+                    f.writelines(dag_contents_template.format(dag_id, tag))
+            _process_file(filename, session)
+
+
 @pytest.fixture()
 def broken_dags(tmpdir, working_dags):
     with create_session() as session:
@@ -165,6 +203,16 @@ def broken_dags(tmpdir, working_dags):
             _process_file(filename, session)
 
 
[email protected]()
+def broken_dags_with_read_perm(tmpdir, working_dags_with_read_perm):
+    with create_session() as session:
+        for dag_id in TEST_FILTER_DAG_IDS:
+            filename = os.path.join(tmpdir, f"{dag_id}.py")
+            with open(filename, "w") as f:
+                f.writelines("airflow DAG")
+            _process_file(filename, session)
+
+
 def test_home_filter_tags(working_dags, admin_client):
     with admin_client:
         admin_client.get("home?tags=example&tags=data", follow_redirects=True)
@@ -183,7 +231,7 @@ def test_home_importerrors(broken_dags, user_client):
 
 
 @pytest.mark.parametrize("page", ["home", "home?status=active", 
"home?status=paused", "home?status=all"])
-def test_home_importerrors_filtered_singledag_user(broken_dags, 
client_single_dag, page):
+def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm, 
client_single_dag, page):
     # Users that can only see certain DAGs get a filtered list of import errors
     resp = client_single_dag.get(page, follow_redirects=True)
     check_content_in_response("Import Errors", resp)
@@ -201,7 +249,7 @@ def test_home_dag_list(working_dags, user_client):
         check_content_in_response(f"dag_id={dag_id}", resp)
 
 
-def test_home_dag_list_filtered_singledag_user(working_dags, 
client_single_dag):
+def test_home_dag_list_filtered_singledag_user(working_dags_with_read_perm, 
client_single_dag):
     # Users that can only see certain DAGs get a filtered list
     resp = client_single_dag.get("home", follow_redirects=True)
     # They can see the first DAG
@@ -219,7 +267,7 @@ def test_home_dag_list_search(working_dags, user_client):
     check_content_not_in_response("dag_id=a_first_dag_id_asc", resp)
 
 
-def test_home_dag_edit_permissions(capture_templates, working_dags, 
client_single_dag_edit):
+def test_home_dag_edit_permissions(capture_templates, 
working_dags_with_edit_perm, client_single_dag_edit):
     with capture_templates() as templates:
         client_single_dag_edit.get("home", follow_redirects=True)
 

Reply via email to