This is an automated email from the ASF dual-hosted git repository. beto pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push: new 2c6f581fa6 feat(postgresql): dynamic schema (#23401) 2c6f581fa6 is described below commit 2c6f581fa621033efc7d1c8699dd386539a03db8 Author: Beto Dealmeida <robe...@dealmeida.net> AuthorDate: Fri Mar 17 17:53:42 2023 -0700 feat(postgresql): dynamic schema (#23401) --- superset/db_engine_specs/base.py | 41 +++++++++------- superset/db_engine_specs/drill.py | 18 ++++--- superset/db_engine_specs/hive.py | 16 +++--- superset/db_engine_specs/mysql.py | 14 +++--- superset/db_engine_specs/postgres.py | 60 ++++++++++++++++++----- superset/db_engine_specs/presto.py | 20 +++++--- superset/db_engine_specs/snowflake.py | 22 +++++---- superset/models/core.py | 50 +++++++++++++++---- tests/unit_tests/db_engine_specs/test_postgres.py | 24 +++++++++ 9 files changed, 187 insertions(+), 78 deletions(-) diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py index 26dd169dc0..b8b1662057 100644 --- a/superset/db_engine_specs/base.py +++ b/superset/db_engine_specs/base.py @@ -371,7 +371,7 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods supports_file_upload = True # Is the DB engine spec able to change the default schema? This requires implementing - # a custom `adjust_database_uri` method. + # a custom `adjust_engine_params` method. supports_dynamic_schema = False @classmethod @@ -472,7 +472,7 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods Determining the correct schema is crucial for managing access to data, so please make sure you understand this logic when working on a new DB engine spec. """ - # default schema varies on a per-query basis + # dynamic schema varies on a per-query basis if cls.supports_dynamic_schema: return query.schema @@ -1057,30 +1057,33 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods ] @classmethod - def adjust_database_uri( # pylint: disable=unused-argument + def adjust_engine_params( # pylint: disable=unused-argument cls, uri: URL, - selected_schema: Optional[str], - ) -> URL: + connect_args: Dict[str, Any], + catalog: Optional[str] = None, + schema: Optional[str] = None, + ) -> Tuple[URL, Dict[str, Any]]: """ - Return a modified URL with a new database component. + Return a new URL and ``connect_args`` for a specific catalog/schema. + + This is used in SQL Lab, allowing users to select a schema from the list of + schemas available in a given database, and have the query run with that schema as + the default one. - The URI here represents the URI as entered when saving the database, - ``selected_schema`` is the schema currently active presumably in - the SQL Lab dropdown. Based on that, for some database engine, - we can return a new altered URI that connects straight to the - active schema, meaning the users won't have to prefix the object - names by the schema name. + For some databases (like MySQL, Presto, Snowflake) this requires modifying the + SQLAlchemy URI before creating the connection. For others (like Postgres), it + requires additional parameters in ``connect_args``. - Some databases engines have 2 level of namespacing: database and - schema (postgres, oracle, mssql, ...) - For those it's probably better to not alter the database - component of the URI with the schema name, it won't work. + When a DB engine spec implements this method it should also have the attribute + ``supports_dynamic_schema`` set to true, so that Superset knows in which schema a + given query is running in order to enforce permissions (see #23385 and #23401). - Some database drivers like Presto accept '{catalog}/{schema}' in - the database component of the URL, that can be handled here. + Currently, changing the catalog is not supported. The method acceps a catalog so + that when catalog support is added to Superse the interface remains the same. This + is important because DB engine specs can be installed from 3rd party packages. """ - return uri + return uri, connect_args @classmethod def patch(cls) -> None: diff --git a/superset/db_engine_specs/drill.py b/superset/db_engine_specs/drill.py index 4ae5ae59b3..16ac89212a 100644 --- a/superset/db_engine_specs/drill.py +++ b/superset/db_engine_specs/drill.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. from datetime import datetime -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Tuple from urllib import parse from sqlalchemy import types @@ -71,13 +71,17 @@ class DrillEngineSpec(BaseEngineSpec): return None @classmethod - def adjust_database_uri(cls, uri: URL, selected_schema: Optional[str]) -> URL: - if selected_schema: - uri = uri.set( - database=parse.quote(selected_schema.replace(".", "/"), safe="") - ) + def adjust_engine_params( + cls, + uri: URL, + connect_args: Dict[str, Any], + catalog: Optional[str] = None, + schema: Optional[str] = None, + ) -> Tuple[URL, Dict[str, Any]]: + if schema: + uri = uri.set(database=parse.quote(schema.replace(".", "/"), safe="")) - return uri + return uri, connect_args @classmethod def get_schema_from_engine_params( diff --git a/superset/db_engine_specs/hive.py b/superset/db_engine_specs/hive.py index f90d889f8c..792ef94735 100644 --- a/superset/db_engine_specs/hive.py +++ b/superset/db_engine_specs/hive.py @@ -260,13 +260,17 @@ class HiveEngineSpec(PrestoEngineSpec): return None @classmethod - def adjust_database_uri( - cls, uri: URL, selected_schema: Optional[str] = None - ) -> URL: - if selected_schema: - uri = uri.set(database=parse.quote(selected_schema, safe="")) + def adjust_engine_params( + cls, + uri: URL, + connect_args: Dict[str, Any], + catalog: Optional[str] = None, + schema: Optional[str] = None, + ) -> Tuple[URL, Dict[str, Any]]: + if schema: + uri = uri.set(database=parse.quote(schema, safe="")) - return uri + return uri, connect_args @classmethod def get_schema_from_engine_params( diff --git a/superset/db_engine_specs/mysql.py b/superset/db_engine_specs/mysql.py index 04b8c68dd7..e5ff964f86 100644 --- a/superset/db_engine_specs/mysql.py +++ b/superset/db_engine_specs/mysql.py @@ -191,15 +191,17 @@ class MySQLEngineSpec(BaseEngineSpec, BasicParametersMixin): return None @classmethod - def adjust_database_uri( + def adjust_engine_params( cls, uri: URL, - selected_schema: Optional[str] = None, - ) -> URL: - if selected_schema: - uri = uri.set(database=parse.quote(selected_schema, safe="")) + connect_args: Dict[str, Any], + catalog: Optional[str] = None, + schema: Optional[str] = None, + ) -> Tuple[URL, Dict[str, Any]]: + if schema: + uri = uri.set(database=parse.quote(schema, safe="")) - return uri + return uri, connect_args @classmethod def get_schema_from_engine_params( diff --git a/superset/db_engine_specs/postgres.py b/superset/db_engine_specs/postgres.py index 84ddf56e10..fac0b1b1d0 100644 --- a/superset/db_engine_specs/postgres.py +++ b/superset/db_engine_specs/postgres.py @@ -72,12 +72,30 @@ COLUMN_DOES_NOT_EXIST_REGEX = re.compile( SYNTAX_ERROR_REGEX = re.compile('syntax error at or near "(?P<syntax_error>.*?)"') +def parse_options(connect_args: Dict[str, Any]) -> Dict[str, str]: + """ + Parse ``options`` from ``connect_args`` into a dictionary. + """ + if not isinstance(connect_args.get("options"), str): + return {} + + tokens = ( + tuple(token.strip() for token in option.strip().split("=", 1)) + for option in re.split(r"-c\s?", connect_args["options"]) + if "=" in option + ) + + return {token[0]: token[1] for token in tokens} + + class PostgresBaseEngineSpec(BaseEngineSpec): """Abstract class for Postgres 'like' databases""" engine = "" engine_name = "PostgreSQL" + supports_dynamic_schema = True + _time_grain_expressions = { None: "{col}", "PT1S": "DATE_TRUNC('second', {col})", @@ -147,6 +165,25 @@ class PostgresBaseEngineSpec(BaseEngineSpec): ), } + @classmethod + def adjust_engine_params( + cls, + uri: URL, + connect_args: Dict[str, Any], + catalog: Optional[str] = None, + schema: Optional[str] = None, + ) -> Tuple[URL, Dict[str, Any]]: + if not schema: + return uri, connect_args + + options = parse_options(connect_args) + options["search_path"] = schema + connect_args["options"] = " ".join( + f"-c{key}={value}" for key, value in options.items() + ) + + return uri, connect_args + @classmethod def get_schema_from_engine_params( cls, @@ -166,19 +203,16 @@ class PostgresBaseEngineSpec(BaseEngineSpec): to determine the schema for a non-qualified table in a query. In cases like that we raise an exception. """ - options = re.split(r"-c\s?", connect_args.get("options", "")) - for option in options: - if "=" not in option: - continue - key, value = option.strip().split("=", 1) - if key.strip() == "search_path": - if "," in value: - raise Exception( - "Multiple schemas are configured in the search path, which means " - "Superset is unable to determine the schema of unqualified table " - "names and enforce permissions." - ) - return value.strip() + options = parse_options(connect_args) + if search_path := options.get("search_path"): + schemas = search_path.split(",") + if len(schemas) > 1: + raise Exception( + "Multiple schemas are configured in the search path, which means " + "Superset is unable to determine the schema of unqualified table " + "names and enforce permissions." + ) + return schemas[0] return None diff --git a/superset/db_engine_specs/presto.py b/superset/db_engine_specs/presto.py index dd7bd88cdb..c0b4f2c6dd 100644 --- a/superset/db_engine_specs/presto.py +++ b/superset/db_engine_specs/presto.py @@ -301,19 +301,23 @@ class PrestoBaseEngineSpec(BaseEngineSpec, metaclass=ABCMeta): return "from_unixtime({col})" @classmethod - def adjust_database_uri( - cls, uri: URL, selected_schema: Optional[str] = None - ) -> URL: + def adjust_engine_params( + cls, + uri: URL, + connect_args: Dict[str, Any], + catalog: Optional[str] = None, + schema: Optional[str] = None, + ) -> Tuple[URL, Dict[str, Any]]: database = uri.database - if selected_schema and database: - selected_schema = parse.quote(selected_schema, safe="") + if schema and database: + schema = parse.quote(schema, safe="") if "/" in database: - database = database.split("/")[0] + "/" + selected_schema + database = database.split("/")[0] + "/" + schema else: - database += "/" + selected_schema + database += "/" + schema uri = uri.set(database=database) - return uri + return uri, connect_args @classmethod def get_schema_from_engine_params( diff --git a/superset/db_engine_specs/snowflake.py b/superset/db_engine_specs/snowflake.py index ba15eea7fb..033b637e48 100644 --- a/superset/db_engine_specs/snowflake.py +++ b/superset/db_engine_specs/snowflake.py @@ -135,17 +135,21 @@ class SnowflakeEngineSpec(PostgresBaseEngineSpec): return extra @classmethod - def adjust_database_uri( - cls, uri: URL, selected_schema: Optional[str] = None - ) -> URL: + def adjust_engine_params( + cls, + uri: URL, + connect_args: Dict[str, Any], + catalog: Optional[str] = None, + schema: Optional[str] = None, + ) -> Tuple[URL, Dict[str, Any]]: database = uri.database - if "/" in uri.database: - database = uri.database.split("/")[0] - if selected_schema: - selected_schema = parse.quote(selected_schema, safe="") - uri = uri.set(database=f"{database}/{selected_schema}") + if "/" in database: + database = database.split("/")[0] + if schema: + schema = parse.quote(schema, safe="") + uri = uri.set(database=f"{database}/{schema}") - return uri + return uri, connect_args @classmethod def get_schema_from_engine_params( diff --git a/superset/models/core.py b/superset/models/core.py index 5717726edc..d7a38cdc03 100755 --- a/superset/models/core.py +++ b/superset/models/core.py @@ -421,32 +421,58 @@ class Database( source: Optional[utils.QuerySource] = None, sqlalchemy_uri: Optional[str] = None, ) -> Engine: - extra = self.get_extra() sqlalchemy_url = make_url_safe( sqlalchemy_uri if sqlalchemy_uri else self.sqlalchemy_uri_decrypted ) self.db_engine_spec.validate_database_uri(sqlalchemy_url) - sqlalchemy_url = self.db_engine_spec.adjust_database_uri(sqlalchemy_url, schema) + extra = self.get_extra() + params = extra.get("engine_params", {}) + if nullpool: + params["poolclass"] = NullPool + connect_args = params.get("connect_args", {}) + + # The ``adjust_database_uri`` method was renamed to ``adjust_engine_params`` and + # had its signature changed in order to support more DB engine specs. Since DB + # engine specs can be released as 3rd party modules we want to make sure the old + # method is still supported so we don't introduce a breaking change. + if hasattr(self.db_engine_spec, "adjust_database_uri"): + sqlalchemy_url = self.db_engine_spec.adjust_database_uri( + sqlalchemy_url, + schema, + ) + logger.warning( + "DB engine spec %s implements the method `adjust_database_uri`, which is " + "deprecated and will be removed in version 3.0. Please update it to " + "implement `adjust_engine_params` instead.", + self.db_engine_spec, + ) + + sqlalchemy_url, connect_args = self.db_engine_spec.adjust_engine_params( + uri=sqlalchemy_url, + connect_args=connect_args, + catalog=None, + schema=schema, + ) + effective_username = self.get_effective_user(sqlalchemy_url) # If using MySQL or Presto for example, will set url.username # If using Hive, will not do anything yet since that relies on a # configuration parameter instead. sqlalchemy_url = self.db_engine_spec.get_url_for_impersonation( - sqlalchemy_url, self.impersonate_user, effective_username + sqlalchemy_url, + self.impersonate_user, + effective_username, ) masked_url = self.get_password_masked_url(sqlalchemy_url) logger.debug("Database._get_sqla_engine(). Masked URL: %s", str(masked_url)) - params = extra.get("engine_params", {}) - if nullpool: - params["poolclass"] = NullPool - - connect_args = params.get("connect_args", {}) if self.impersonate_user: self.db_engine_spec.update_impersonation_config( - connect_args, str(sqlalchemy_url), effective_username + connect_args, + str(sqlalchemy_url), + effective_username, ) if connect_args: @@ -464,7 +490,11 @@ class Database( source = utils.QuerySource.SQL_LAB sqlalchemy_url, params = DB_CONNECTION_MUTATOR( - sqlalchemy_url, params, effective_username, security_manager, source + sqlalchemy_url, + params, + effective_username, + security_manager, + source, ) try: return create_engine(sqlalchemy_url, **params) diff --git a/tests/unit_tests/db_engine_specs/test_postgres.py b/tests/unit_tests/db_engine_specs/test_postgres.py index e57e6a6f8e..fef8647959 100644 --- a/tests/unit_tests/db_engine_specs/test_postgres.py +++ b/tests/unit_tests/db_engine_specs/test_postgres.py @@ -131,3 +131,27 @@ def test_get_schema_from_engine_params() -> None: "Superset is unable to determine the schema of unqualified table " "names and enforce permissions." ) + + +def test_adjust_engine_params() -> None: + """ + Test the ``adjust_engine_params`` method. + """ + from superset.db_engine_specs.postgres import PostgresEngineSpec + + uri = make_url("postgres://user:password@host/catalog") + + assert PostgresEngineSpec.adjust_engine_params(uri, {}, None, "secret") == ( + uri, + {"options": "-csearch_path=secret"}, + ) + + assert PostgresEngineSpec.adjust_engine_params( + uri, + {"foo": "bar", "options": "-csearch_path=default -c debug=1"}, + None, + "secret", + ) == ( + uri, + {"foo": "bar", "options": "-csearch_path=secret -cdebug=1"}, + )