This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2c0c8b8bfb Fix DAG.access_control can't sync when clean access_control
(#30340)
2c0c8b8bfb is described below
commit 2c0c8b8bfb5287e10dc40b73f326bbf9a0437bb1
Author: Huy Mạc Quang <[email protected]>
AuthorDate: Wed Apr 26 21:11:14 2023 +0700
Fix DAG.access_control can't sync when clean access_control (#30340)
* Reset permission if `access_control` is empty
* Check `resource` before call `_revoke_all_stale_permissions`
* Fix static checks
---
airflow/models/dagbag.py | 28 ++++----------------
airflow/www/security.py | 17 ++++++++++++
tests/models/test_dagbag.py | 3 +--
tests/www/views/test_views_home.py | 54 +++++++++++++++++++++++++++++++++++---
4 files changed, 74 insertions(+), 28 deletions(-)
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 3d00ac973f..305009e2ea 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -689,29 +689,11 @@ class DagBag(LoggingMixin):
@classmethod
@provide_session
def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION):
- """Sync DAG specific permissions, if necessary"""
- from airflow.security.permissions import DAG_ACTIONS,
resource_name_for_dag
- from airflow.www.fab_security.sqla.models import Action, Permission,
Resource
-
+ """Sync DAG specific permissions"""
root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
- def needs_perms(dag_id: str) -> bool:
- dag_resource_name = resource_name_for_dag(dag_id)
- for permission_name in DAG_ACTIONS:
- if not (
- session.query(Permission)
- .join(Action)
- .join(Resource)
- .filter(Action.name == permission_name)
- .filter(Resource.name == dag_resource_name)
- .one_or_none()
- ):
- return True
- return False
-
- if dag.access_control or needs_perms(root_dag_id):
- cls.logger().debug("Syncing DAG permissions: %s to the DB",
root_dag_id)
- from airflow.www.security import ApplessAirflowSecurityManager
+ cls.logger().debug("Syncing DAG permissions: %s to the DB",
root_dag_id)
+ from airflow.www.security import ApplessAirflowSecurityManager
- security_manager = ApplessAirflowSecurityManager(session=session)
- security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
+ security_manager = ApplessAirflowSecurityManager(session=session)
+ security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 201c9ada0a..50c484082c 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -655,8 +655,25 @@ class AirflowSecurityManager(SecurityManager,
LoggingMixin):
for dag_action_name in self.DAG_ACTIONS:
self.create_permission(dag_action_name, dag_resource_name)
+ def _revoke_all_stale_permissions(resource: Resource):
+ existing_dag_perms = self.get_resource_permissions(resource)
+ for perm in existing_dag_perms:
+ non_admin_roles = [role for role in perm.role if role.name !=
"Admin"]
+ for role in non_admin_roles:
+ self.log.info(
+ "Revoking '%s' on DAG '%s' for role '%s'",
+ perm.action,
+ dag_resource_name,
+ role.name,
+ )
+ self.remove_permission_from_role(role, perm)
+
if access_control:
self._sync_dag_view_permissions(dag_resource_name, access_control)
+ else:
+ resource = self.get_resource(dag_resource_name)
+ if resource:
+ _revoke_all_stale_permissions(resource)
def _sync_dag_view_permissions(self, dag_id: str, access_control:
dict[str, Collection[str]]) -> None:
"""
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index fd4d5bdc82..22ff525bb8 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -908,7 +908,6 @@ class TestDagBag:
def test_sync_perm_for_dag(self, mock_security_manager):
"""
Test that dagbag._sync_perm_for_dag will call
ApplessAirflowSecurityManager.sync_perm_for_dag
- when DAG specific perm views don't exist already or the DAG has
access_control set.
"""
db_clean_up()
with create_session() as session:
@@ -932,7 +931,7 @@ class TestDagBag:
# perms now exist
_sync_perms()
- mock_sync_perm_for_dag.assert_not_called()
+
mock_sync_perm_for_dag.assert_called_once_with("test_example_bash_operator",
None)
# Always sync if we have access_control
dag.access_control = {"Public": {"can_read"}}
diff --git a/tests/www/views/test_views_home.py
b/tests/www/views/test_views_home.py
index 6619bcdf71..045e34b002 100644
--- a/tests/www/views/test_views_home.py
+++ b/tests/www/views/test_views_home.py
@@ -155,6 +155,44 @@ def working_dags(tmpdir):
_process_file(filename, session)
[email protected]()
+def working_dags_with_read_perm(tmpdir):
+ dag_contents_template = "from airflow import DAG\ndag = DAG('{}',
tags=['{}'])"
+ dag_contents_template_with_read_perm = (
+ "from airflow import DAG\ndag = DAG('{}', tags=['{}'], "
+ "access_control={{'role_single_dag':{{'can_read'}}}}) "
+ )
+ with create_session() as session:
+ for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)):
+ filename = os.path.join(tmpdir, f"{dag_id}.py")
+ if dag_id == "filter_test_1":
+ with open(filename, "w") as f:
+
f.writelines(dag_contents_template_with_read_perm.format(dag_id, tag))
+ else:
+ with open(filename, "w") as f:
+ f.writelines(dag_contents_template.format(dag_id, tag))
+ _process_file(filename, session)
+
+
[email protected]()
+def working_dags_with_edit_perm(tmpdir):
+ dag_contents_template = "from airflow import DAG\ndag = DAG('{}',
tags=['{}'])"
+ dag_contents_template_with_read_perm = (
+ "from airflow import DAG\ndag = DAG('{}', tags=['{}'], "
+ "access_control={{'role_single_dag':{{'can_edit'}}}}) "
+ )
+ with create_session() as session:
+ for dag_id, tag in list(zip(TEST_FILTER_DAG_IDS, TEST_TAGS)):
+ filename = os.path.join(tmpdir, f"{dag_id}.py")
+ if dag_id == "filter_test_1":
+ with open(filename, "w") as f:
+
f.writelines(dag_contents_template_with_read_perm.format(dag_id, tag))
+ else:
+ with open(filename, "w") as f:
+ f.writelines(dag_contents_template.format(dag_id, tag))
+ _process_file(filename, session)
+
+
@pytest.fixture()
def broken_dags(tmpdir, working_dags):
with create_session() as session:
@@ -165,6 +203,16 @@ def broken_dags(tmpdir, working_dags):
_process_file(filename, session)
[email protected]()
+def broken_dags_with_read_perm(tmpdir, working_dags_with_read_perm):
+ with create_session() as session:
+ for dag_id in TEST_FILTER_DAG_IDS:
+ filename = os.path.join(tmpdir, f"{dag_id}.py")
+ with open(filename, "w") as f:
+ f.writelines("airflow DAG")
+ _process_file(filename, session)
+
+
def test_home_filter_tags(working_dags, admin_client):
with admin_client:
admin_client.get("home?tags=example&tags=data", follow_redirects=True)
@@ -183,7 +231,7 @@ def test_home_importerrors(broken_dags, user_client):
@pytest.mark.parametrize("page", ["home", "home?status=active",
"home?status=paused", "home?status=all"])
-def test_home_importerrors_filtered_singledag_user(broken_dags,
client_single_dag, page):
+def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm,
client_single_dag, page):
# Users that can only see certain DAGs get a filtered list of import errors
resp = client_single_dag.get(page, follow_redirects=True)
check_content_in_response("Import Errors", resp)
@@ -201,7 +249,7 @@ def test_home_dag_list(working_dags, user_client):
check_content_in_response(f"dag_id={dag_id}", resp)
-def test_home_dag_list_filtered_singledag_user(working_dags,
client_single_dag):
+def test_home_dag_list_filtered_singledag_user(working_dags_with_read_perm,
client_single_dag):
# Users that can only see certain DAGs get a filtered list
resp = client_single_dag.get("home", follow_redirects=True)
# They can see the first DAG
@@ -219,7 +267,7 @@ def test_home_dag_list_search(working_dags, user_client):
check_content_not_in_response("dag_id=a_first_dag_id_asc", resp)
-def test_home_dag_edit_permissions(capture_templates, working_dags,
client_single_dag_edit):
+def test_home_dag_edit_permissions(capture_templates,
working_dags_with_edit_perm, client_single_dag_edit):
with capture_templates() as templates:
client_single_dag_edit.get("home", follow_redirects=True)