This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit b51a9c618ae9c13e7f67b800cc09ec3446f45ecb Author: Jed Cunningham <[email protected]> AuthorDate: Thu Mar 25 09:37:46 2021 -0600 Speed up webserver start when there are many DAGs (#14993) This fixes a short circuit in `create_dag_specific_permissions` to avoid needlessly querying permissions for every single DAG, and changes `get_all_permissions` to run 1 query instead of many. With ~5k DAGs, these changes speed up `create_dag_specific_permissions` by more than 65 seconds each call (on my machine), and since that method is called twice before the webserver actually responds to requests, this effectively speeds up the webserver startup by over 2 minutes. (cherry picked from commit 35fbb726498e3090258a89c7819b2ff3266948f6) --- airflow/www/security.py | 22 +++++++++++----------- tests/www/test_security.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/airflow/www/security.py b/airflow/www/security.py index 5201ef6..8a2b10b 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -457,17 +457,17 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): # pylint: disable= self.get_session.commit() - def get_all_permissions(self): + def get_all_permissions(self) -> Set[Tuple[str, str]]: """Returns all permissions as a set of tuples with the perm name and view menu name""" - perms = set() - for permission_view in self.get_session.query(self.permissionview_model).all(): - if permission_view.permission and permission_view.view_menu: - perms.add((permission_view.permission.name, permission_view.view_menu.name)) - - return perms + return set( + self.get_session.query(self.permissionview_model) + .join(self.permission_model) + .join(self.viewmenu_model) + .with_entities(self.permission_model.name, self.viewmenu_model.name) + .all() + ) - @provide_session - def create_dag_specific_permissions(self, session=None): + def create_dag_specific_permissions(self) -> None: """ Creates 'can_read' and 'can_edit' permissions for all active and paused DAGs. @@ -475,7 +475,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): # pylint: disable= """ perms = self.get_all_permissions() rows = ( - session.query(models.DagModel.dag_id) + self.get_session.query(models.DagModel.dag_id) .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) .all() ) @@ -484,7 +484,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): # pylint: disable= dag_id = row[0] for perm_name in self.DAG_PERMS: dag_resource_name = self.prefixed_dag_id(dag_id) - if dag_resource_name and perm_name and (dag_resource_name, perm_name) not in perms: + if (perm_name, dag_resource_name) not in perms: self._merge_perm(perm_name, dag_resource_name) def update_admin_perm_view(self): diff --git a/tests/www/test_security.py b/tests/www/test_security.py index 9179be2..2ae2608 100644 --- a/tests/www/test_security.py +++ b/tests/www/test_security.py @@ -33,6 +33,7 @@ from airflow.security import permissions from airflow.www import app as application from airflow.www.utils import CustomSQLAInterface from tests.test_utils import fab_utils +from tests.test_utils.asserts import assert_queries_count from tests.test_utils.db import clear_db_dags, clear_db_runs from tests.test_utils.mock_security_manager import MockSecurityManager @@ -542,3 +543,34 @@ class TestSecurity(unittest.TestCase): f"{role.name} should not have {permissions.ACTION_CAN_READ} " f"on {permissions.RESOURCE_CONFIG}" ) + + def test_create_dag_specific_permissions(self): + dag_id = 'some_dag_id' + dag_permission_name = self.security_manager.prefixed_dag_id(dag_id) + assert ('can_read', dag_permission_name) not in self.security_manager.get_all_permissions() + + dag_model = DagModel( + dag_id=dag_id, fileloc='/tmp/dag_.py', schedule_interval='2 2 * * *', is_paused=True + ) + self.session.add(dag_model) + self.session.commit() + + self.security_manager.create_dag_specific_permissions() + self.session.commit() + + assert ('can_read', dag_permission_name) in self.security_manager.get_all_permissions() + + # Make sure we short circuit when the perms already exist + with assert_queries_count(2): # One query to get DagModels, one query to get all perms + self.security_manager.create_dag_specific_permissions() + + def test_get_all_permissions(self): + with assert_queries_count(1): + perms = self.security_manager.get_all_permissions() + + assert isinstance(perms, set) + for perm in perms: + assert isinstance(perm, tuple) + assert len(perm) == 2 + + assert ('can_read', 'Connections') in perms
