khalidmammadov commented on a change in pull request #18961:
URL: https://github.com/apache/airflow/pull/18961#discussion_r731740248
##########
File path: tests/www/test_security.py
##########
@@ -78,629 +77,745 @@ def some_action(self):
return "action!"
-class TestSecurity(unittest.TestCase):
- @classmethod
- def setUpClass(cls):
- settings.configure_orm()
- cls.session = settings.Session
- cls.app = application.create_app(testing=True)
- cls.appbuilder = cls.app.appbuilder
- cls.app.config['WTF_CSRF_ENABLED'] = False
- cls.security_manager = cls.appbuilder.sm
- cls.delete_roles()
-
- def setUp(self):
- clear_db_runs()
- clear_db_dags()
- self.db = SQLA(self.app)
- self.appbuilder.add_view(SomeBaseView, "SomeBaseView",
category="BaseViews")
- self.appbuilder.add_view(SomeModelView, "SomeModelView",
category="ModelViews")
-
- log.debug("Complete setup!")
-
- @classmethod
- def delete_roles(cls):
- for role_name in [
- 'team-a',
- 'MyRole1',
- 'MyRole5',
- 'Test_Role',
- 'MyRole3',
- 'MyRole2',
- 'dag_permission_role',
- ]:
- api_connexion_utils.delete_role(cls.app, role_name)
-
- def expect_user_is_in_role(self, user, rolename):
- self.security_manager.bulk_sync_roles([{'role': rolename, 'perms':
[]}])
- role = self.security_manager.find_role(rolename)
+def _clear_db():
+ clear_db_runs()
+ clear_db_dags()
+
+
+def _create_dag_permissions(dag_id, security_manager):
+ security_manager.sync_perm_for_dag(dag_id, access_control=None)
+
+
+def _delete_dag_permissions(dag_id, security_manager):
+ dag_resource_name = permissions.resource_name_for_dag(dag_id)
+ for dag_action_name in security_manager.DAG_ACTIONS:
+ security_manager.delete_permission(dag_action_name, dag_resource_name)
+
+
+def _create_dag(dag_id, session, security_manager):
+ dag_model = DagModel(dag_id=dag_id)
+ session.add(dag_model)
+ session.commit()
+ _create_dag_permissions(dag_id, security_manager)
+ return dag_model
+
+
+def _delete_dag(dag_model, session, security_manager):
+ session.delete(dag_model)
+ session.commit()
+ _delete_dag_permissions(dag_model.dag_id, security_manager)
+
+
[email protected]
+def _create_dag_context(dag_id, session, security_manager):
+ dag = _create_dag(dag_id, session, security_manager)
+ yield dag
+ _delete_dag(dag, session, security_manager)
+
+
[email protected](scope="module", autouse=True)
+def clear_db_after_suite():
+ yield None
+ _clear_db()
+
+
[email protected](scope="function", autouse=True)
+def clear_db_before_test():
+ _clear_db()
+
+
[email protected](scope="module")
+def app():
+ _app = application.create_app(testing=True)
+ _app.config['WTF_CSRF_ENABLED'] = False
+ return _app
+
+
[email protected](scope="module")
+def app_builder(app):
+ app_builder = app.appbuilder
+ app_builder.add_view(SomeBaseView, "SomeBaseView", category="BaseViews")
+ app_builder.add_view(SomeModelView, "SomeModelView", category="ModelViews")
+ return app.appbuilder
+
+
[email protected](scope="module")
+def security_manager(app_builder):
+ return app_builder.sm
+
+
[email protected](scope="module")
+def session(app_builder):
+ return app_builder.get_session
+
+
[email protected](scope="module")
+def db(app):
+ return SQLA(app)
+
+
[email protected](scope="module")
+def expect_user_is_in_role(security_manager):
+ def check(user, role_name):
+ security_manager.bulk_sync_roles([{'role': role_name, 'perms': []}])
+ role = security_manager.find_role(role_name)
if not role:
- self.security_manager.add_role(rolename)
- role = self.security_manager.find_role(rolename)
+ security_manager.add_role(role_name)
+ role = security_manager.find_role(role_name)
user.roles = [role]
- self.security_manager.update_user(user)
+ security_manager.update_user(user)
+
+ return check
+
+
[email protected](scope="module")
+def has_dag_perm(security_manager):
+ def _has_dag_perm(perm, dag_id, user):
+ return security_manager.has_access(perm,
permissions.resource_name_for_dag(dag_id), user)
- def assert_user_has_dag_perms(self, perms, dag_id, user=None):
+ return _has_dag_perm
+
+
[email protected](scope="module")
+def assert_user_has_dag_perms(has_dag_perm):
+ def _assert_user_has_dag_perms(perms, dag_id, user=None):
for perm in perms:
- assert self._has_dag_perm(perm, dag_id, user), f"User should have
'{perm}' on DAG '{dag_id}'"
+ assert has_dag_perm(perm, dag_id, user), f"User should have
'{perm}' on DAG '{dag_id}'"
+
+ return _assert_user_has_dag_perms
- def assert_user_does_not_have_dag_perms(self, dag_id, perms, user=None):
+
[email protected](scope="module")
+def assert_user_does_not_have_dag_perms(has_dag_perm):
+ def _assert_user_does_not_have_dag_perms(dag_id, perms, user=None):
for perm in perms:
- assert not self._has_dag_perm(
- perm, dag_id, user
- ), f"User should not have '{perm}' on DAG '{dag_id}'"
-
- def _has_dag_perm(self, perm, dag_id, user):
- return self.security_manager.has_access(perm,
permissions.resource_name_for_dag(dag_id), user)
-
- def _create_dag(self, dag_id):
- dag_model = DagModel(dag_id=dag_id)
- self.session.add(dag_model)
- self.session.commit()
- self.security_manager.sync_perm_for_dag(dag_id, access_control=None)
-
- def tearDown(self):
- clear_db_runs()
- clear_db_dags()
- self.appbuilder = None
- self.app = None
- self.db = None
- log.debug("Complete teardown!")
-
- def test_init_role_baseview(self):
- role_name = 'MyRole7'
- role_perms = [('can_some_other_action', 'AnotherBaseView')]
- with pytest.warns(
- DeprecationWarning,
- match="`init_role` has been deprecated\\. Please use
`bulk_sync_roles` instead\\.",
- ):
- self.security_manager.init_role(role_name, role_perms)
-
- role = self.appbuilder.sm.find_role(role_name)
- assert role is not None
- assert len(role_perms) == len(role.permissions)
-
- def test_bulk_sync_roles_baseview(self):
- role_name = 'MyRole3'
- role_perms = [('can_some_action', 'SomeBaseView')]
- self.security_manager.bulk_sync_roles([{'role': role_name, 'perms':
role_perms}])
-
- role = self.appbuilder.sm.find_role(role_name)
- assert role is not None
- assert len(role_perms) == len(role.permissions)
-
- def test_bulk_sync_roles_modelview(self):
- role_name = 'MyRole2'
- role_perms = [
- ('can_list', 'SomeModelView'),
- ('can_show', 'SomeModelView'),
- ('can_add', 'SomeModelView'),
- (permissions.ACTION_CAN_EDIT, 'SomeModelView'),
- (permissions.ACTION_CAN_DELETE, 'SomeModelView'),
- ]
- mock_roles = [{'role': role_name, 'perms': role_perms}]
- self.security_manager.bulk_sync_roles(mock_roles)
+ assert not has_dag_perm(perm, dag_id, user), f"User should not
have '{perm}' on DAG '{dag_id}'"
- role = self.appbuilder.sm.find_role(role_name)
- assert role is not None
- assert len(role_perms) == len(role.permissions)
+ return _assert_user_does_not_have_dag_perms
- # Check short circuit works
- with assert_queries_count(2): # One for permissionview, one for roles
- self.security_manager.bulk_sync_roles(mock_roles)
- def test_update_and_verify_permission_role(self):
- role_name = 'Test_Role'
- role_perms = []
- mock_roles = [{'role': role_name, 'perms': role_perms}]
- self.security_manager.bulk_sync_roles(mock_roles)
- role = self.security_manager.find_role(role_name)
+def test_init_role_baseview(app, security_manager):
+ role_name = 'MyRole7'
+ role_perms = [('can_some_other_action', 'AnotherBaseView')]
+ with pytest.warns(
+ DeprecationWarning,
+ match="`init_role` has been deprecated\\. Please use `bulk_sync_roles`
instead\\.",
+ ):
+ security_manager.init_role(role_name, role_perms)
- perm =
self.security_manager.get_permission(permissions.ACTION_CAN_EDIT,
permissions.RESOURCE_ROLE)
- self.security_manager.add_permission_role(role, perm)
- role_perms_len = len(role.permissions)
+ role = security_manager.find_role(role_name)
+ assert role is not None
+ assert len(role_perms) == len(role.permissions)
+ delete_role(app, role_name)
Review comment:
Cleanups are now done either in the fixture or by context manager
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]