This is an automated email from the ASF dual-hosted git repository. vavila pushed a commit to branch feat/async-db-perm-sync in repository https://gitbox.apache.org/repos/asf/superset.git
commit 4c18f8bf6ca1ab259d503ac05c4c53e6a2c3a431 Author: Vitor Avila <[email protected]> AuthorDate: Thu Feb 6 23:42:47 2025 -0300 Initial setup for testing --- superset/commands/database/exceptions.py | 10 +++ superset/commands/database/resync_permissions.py | 79 ++++++++++++++++++++++++ superset/databases/api.py | 58 ++++++++++++++++- superset/tasks/permissions.py | 49 +++++++++++++++ 4 files changed, 195 insertions(+), 1 deletion(-) diff --git a/superset/commands/database/exceptions.py b/superset/commands/database/exceptions.py index 5285deb0f9..b80d7acfbe 100644 --- a/superset/commands/database/exceptions.py +++ b/superset/commands/database/exceptions.py @@ -88,11 +88,21 @@ class DatabaseExtraValidationError(ValidationError): ) +class DatabaseConnectionNotWorkingError(CommandException): + status = 400 + message = _("DB Connection not working, please check your connection settings.") + + class DatabaseNotFoundError(CommandException): status = 404 message = _("Database not found.") +class UserNotFoundError(CommandException): + status = 400 + message = _("User not found.") + + class DatabaseSchemaUploadNotAllowed(CommandException): status = 403 message = _("Database schema is not allowed for csv uploads.") diff --git a/superset/commands/database/resync_permissions.py b/superset/commands/database/resync_permissions.py new file mode 100644 index 0000000000..28d7879ec5 --- /dev/null +++ b/superset/commands/database/resync_permissions.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from contextlib import closing +from sqlite3 import ProgrammingError + +from flask import current_app as app +from sqlalchemy.engine import Engine + +from superset import security_manager +from superset.commands.base import BaseCommand +from superset.commands.database.exceptions import ( + DatabaseConnectionNotWorkingError, + DatabaseNotFoundError, + UserNotFoundError, +) +from superset.models.core import Database +from superset.utils.core import timeout + +logger = logging.getLogger(__name__) + + +class ResyncPermissionsCommand(BaseCommand): + def __init__(self, model: Database | None, username: str): + self._model = model + self._username = username + + def run(self) -> None: + self.validate() + + def validate(self) -> None: + """ + Validates the command. + """ + if not self._model: + raise DatabaseNotFoundError() + + # If OAuth2 connection, we need to impersonate + # the current user to trigger the resync + if self._model.is_oauth2_enabled(): + if not security_manager.get_user_by_username(self._username): + raise UserNotFoundError() + + # Make sure the connection works before delegating the task + def ping(engine: Engine) -> bool: + with closing(engine.raw_connection()) as conn: + return engine.dialect.do_ping(conn) + + with self._model.get_sqla_engine() as engine: + try: + time_delta = app.config["TEST_DATABASE_CONNECTION_TIMEOUT"] + with timeout(int(time_delta.total_seconds())): + alive = ping(engine) + except (ProgrammingError, RuntimeError): + logger.warning("Raw connection failed, retrying with engine") + alive = engine.dialect.do_ping(engine) + except Exception as err: + logger.error("Could not stablish a DB connection") + raise DatabaseConnectionNotWorkingError() from err + + if not alive: + logger.error("Could not stablish a DB connection") + raise DatabaseConnectionNotWorkingError() diff --git a/superset/databases/api.py b/superset/databases/api.py index f9c1dfd122..08c0c41306 100644 --- a/superset/databases/api.py +++ b/superset/databases/api.py @@ -46,6 +46,7 @@ from superset.commands.database.exceptions import ( ) from superset.commands.database.export import ExportDatabasesCommand from superset.commands.database.importers.dispatcher import ImportDatabasesCommand +from superset.commands.database.resync_permissions import ResyncPermissionsCommand from superset.commands.database.ssh_tunnel.delete import DeleteSSHTunnelCommand from superset.commands.database.ssh_tunnel.exceptions import ( SSHTunnelDatabasePortError, @@ -119,8 +120,13 @@ from superset.extensions import security_manager from superset.models.core import Database from superset.sql_parse import Table from superset.superset_typing import FlaskResponse +from superset.tasks.permissions import resync_database_permissions from superset.utils import json -from superset.utils.core import error_msg_from_exception, parse_js_uri_path_item +from superset.utils.core import ( + error_msg_from_exception, + get_username, + parse_js_uri_path_item, +) from superset.utils.decorators import transaction from superset.utils.oauth2 import decode_oauth2_state from superset.utils.ssh_tunnel import mask_password_info @@ -613,6 +619,56 @@ class DatabaseRestApi(BaseSupersetModelRestApi): ) return self.response_422(message=str(ex)) + @expose("/<int:pk>/resync-permissions/", methods=("POST",)) + @protect() + @safe + @statsd_metrics + @event_logger.log_this_with_context( + action=lambda self, *args, **kwargs: f"{self.__class__.__name__}" + f".resync-permissions", + log_to_statsd=False, + ) + def resync_permissions(self, pk: int, **kwargs: Any) -> FlaskResponse: + """Re-sync all permissions for a database connection. + --- + post: + summary: Re-sync all permissions for a database connection + parameters: + - in: path + schema: + type: integer + name: pk + description: The database connection ID + responses: + 200: + description: Task created to resync permissions. + content: + application/json: + schema: + type: object + properties: + message: + type: string + 400: + $ref: '#/components/responses/400' + 401: + $ref: '#/components/responses/401' + 404: + $ref: '#/components/responses/404' + 500: + $ref: '#/components/responses/500' + """ + database = DatabaseDAO.find_by_id(pk) + if not database: + return self.response_404() + try: + current_username = get_username() or "" + ResyncPermissionsCommand(database.id, current_username).run() + resync_database_permissions.delay(database, current_username) + return self.response(202, message="OK") + except SupersetException as ex: + return self.response(ex.status, message=ex.message) + @expose("/<int:pk>/catalogs/") @protect() @rison(database_catalogs_query_schema) diff --git a/superset/tasks/permissions.py b/superset/tasks/permissions.py new file mode 100644 index 0000000000..5b0f6ada63 --- /dev/null +++ b/superset/tasks/permissions.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging + +from flask import g + +from superset import security_manager +from superset.commands.database.update import UpdateDatabaseCommand +from superset.daos.database import DatabaseDAO +from superset.extensions import celery_app +from superset.models.core import Database + +logger = logging.getLogger(__name__) + + +@celery_app.task(name="resync_database_permissions", soft_time_limit=600) +def resync_database_permissions( + database: Database, + username: str, +) -> None: + logger.info("Resyncing database permissions for connection ID %s", database.id) + if user := security_manager.get_user_by_username(username): + g.user = user + logger.info("Impersonating user ID %s", g.user.id) + cmmd = UpdateDatabaseCommand(database.id, {}) + try: + cmmd._refresh_catalogs( + database, database.name, DatabaseDAO.get_ssh_tunnel(database.id) + ) + except Exception as ex: + logger.error( + "An error occurred while resyncing database permissions", exc_info=ex + )
