This is an automated email from the ASF dual-hosted git repository. beto pushed a commit to branch explorable in repository https://gitbox.apache.org/repos/asf/superset.git
commit b602cdeff13f9ef14b2c2e0c9a8e622581dcb9d1 Author: Beto Dealmeida <[email protected]> AuthorDate: Mon Oct 6 08:14:30 2025 -0400 WIP --- superset/semantic_layers/snowflake_.py | 241 +++++++++++++++++++++++++-------- 1 file changed, 183 insertions(+), 58 deletions(-) diff --git a/superset/semantic_layers/snowflake_.py b/superset/semantic_layers/snowflake_.py index 9ab6b8e50d..6212597a58 100644 --- a/superset/semantic_layers/snowflake_.py +++ b/superset/semantic_layers/snowflake_.py @@ -17,6 +17,9 @@ from __future__ import annotations +import itertools +import re +from collections import defaultdict from typing import Any, Literal, Union from cryptography.hazmat.backends import default_backend @@ -29,10 +32,26 @@ from pydantic import ( model_validator, SecretStr, ) -from snowflake.connector import connect +from snowflake.connector import connect, DictCursor from snowflake.connector.connection import SnowflakeConnection from snowflake.sqlalchemy.snowdialect import SnowflakeDialect +from superset.semantic_layers.types import ( + BINARY, + BOOLEAN, + DATE, + DATETIME, + DECIMAL, + Dimension, + INTEGER, + Metric, + NUMBER, + OBJECT, + STRING, + TIME, + Type, +) + class UserPasswordAuth(BaseModel): """ @@ -142,6 +161,51 @@ class SnowflakeConfiguration(BaseModel): return self +def get_connection_parameters(configuration: SnowflakeConfiguration) -> dict[str, Any]: + """ + Convert the configuration to connection parameters for the Snowflake connector. + """ + params = { + "account": configuration.account_identifier, + "application": "Apache Superset", + "paramstyle": "qmark", + "insecure_mode": True, + } + + if configuration.role: + params["role"] = configuration.role + if configuration.warehouse: + params["warehouse"] = configuration.warehouse + if configuration.database: + params["database"] = configuration.database + if configuration.schema_: + params["schema"] = configuration.schema_ + + auth = configuration.auth + if isinstance(auth, UserPasswordAuth): + params["user"] = auth.username + params["password"] = auth.password.get_secret_value() + elif isinstance(auth, PrivateKeyAuth): + pem_private_key = serialization.load_pem_private_key( + auth.private_key.encode(), + password=( + auth.private_key_password.encode() + if auth.private_key_password + else None + ), + backend=default_backend(), + ) + params["private_key"] = pem_private_key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + else: + raise ValueError("Unsupported authentication method") + + return params + + class SnowflakeSemanticLayer: configuration_schema = SnowflakeConfiguration @@ -174,7 +238,7 @@ class SnowflakeSemanticLayer: return schema - connection_parameters = cls._get_connection_parameters(configuration) + connection_parameters = get_connection_parameters(configuration) with connect(**connection_parameters) as connection: if all( getattr(configuration, dependency) @@ -202,7 +266,7 @@ class SnowflakeSemanticLayer: """ fields: dict[str, tuple[type, Field]] = {} - connection_parameters = cls._get_connection_parameters(configuration) + connection_parameters = get_connection_parameters(configuration) with connect(**connection_parameters) as connection: if not configuration.database or configuration.allow_changing_database: options = cls._fetch_databases(connection) @@ -253,61 +317,13 @@ class SnowflakeSemanticLayer: """ return {row[0] for row in cursor.execute(query, (database,))} - @classmethod - def _get_connection_parameters( - cls, - configuration: SnowflakeConfiguration, - ) -> dict[str, Any]: - """ - Convert the configuration to connection parameters for the Snowflake connector. - """ - params = { - "account": configuration.account_identifier, - "application": "Apache Superset", - "paramstyle": "qmark", - "insecure_mode": True, - } - - if configuration.role: - params["role"] = configuration.role - if configuration.warehouse: - params["warehouse"] = configuration.warehouse - if configuration.database: - params["database"] = configuration.database - if configuration.schema_: - params["schema"] = configuration.schema_ - - auth = configuration.auth - if isinstance(auth, UserPasswordAuth): - params["user"] = auth.username - params["password"] = auth.password.get_secret_value() - elif isinstance(auth, PrivateKeyAuth): - pem_private_key = serialization.load_pem_private_key( - auth.private_key.encode(), - password=( - auth.private_key_password.encode() - if auth.private_key_password - else None - ), - backend=default_backend(), - ) - params["private_key"] = pem_private_key.private_bytes( - encoding=serialization.Encoding.DER, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption(), - ) - else: - raise ValueError("Unsupported authentication method") - - return params - def __init__(self, configuration: SnowflakeConfiguration): self.configuration = configuration def get_explorables( self, runtime_configuration: BaseModel, - ) -> list[SnowflakeExplorable]: + ) -> set[SnowflakeExplorable]: """ Get a list of available explorables (databases/schemas). """ @@ -316,17 +332,17 @@ class SnowflakeSemanticLayer: update=runtime_configuration.model_dump() ) - connection_parameters = self._get_connection_parameters(configuration) + connection_parameters = get_connection_parameters(configuration) with connect(**connection_parameters) as connection: cursor = connection.cursor() query = """ SHOW SEMANTIC VIEWS ->> SELECT "name" FROM $1; """ - return [ + return { SnowflakeExplorable(configuration, row[0]) for row in cursor.execute(query) - ] + } class SnowflakeExplorable: @@ -336,7 +352,7 @@ class SnowflakeExplorable: self._quote = SnowflakeDialect().identifier_preparer.quote - def __repr__(self) -> str: + def uid(self) -> str: return ".".join( self._quote(part) for part in ( @@ -346,6 +362,105 @@ class SnowflakeExplorable: ) ) + def get_dimensions(self) -> set[Dimension]: + """ + Get the dimensions defined in the explorable. + """ + dimensions: set[Dimension] = set() + + query = f""" + DESC SEMANTIC VIEW {self.uid()} + ->> SELECT "object_name", "property", "property_value" + FROM $1 + WHERE + "object_kind" = 'DIMENSION' AND + "property" IN ('COMMENT', 'DATA_TYPE', 'EXPRESSION', 'TABLE'); + """ # noqa: S608 + + connection_parameters = get_connection_parameters(self.configuration) + with connect(**connection_parameters) as connection: + cursor = connection.cursor(DictCursor) + rows = cursor.execute(query).fetchall() + + for name, group in itertools.groupby(rows, key=lambda x: x["object_name"]): + attributes = defaultdict(set) + for row in group: + attributes[row["property"]].add(row["property_value"]) + + table = next(iter(attributes["TABLE"])) + id_ = table + "." + name + type_ = self._get_type(next(iter(attributes["DATA_TYPE"]))) + description = next(iter(attributes["COMMENT"]), None) + definition = next(iter(attributes["EXPRESSION"]), None) + + dimensions.add(Dimension(id_, name, type_, description, definition)) + + return dimensions + + def get_metrics(self) -> set[Metric]: + """ + Get the metrics defined in the explorable. + """ + metrics: set[Metric] = set() + + query = f""" + DESC SEMANTIC VIEW {self.uid()} + ->> SELECT "object_name", "property", "property_value" + FROM $1 + WHERE + "object_kind" = 'METRIC' AND + "property" IN ('COMMENT', 'DATA_TYPE', 'EXPRESSION', 'TABLE'); + """ # noqa: S608 + + connection_parameters = get_connection_parameters(self.configuration) + with connect(**connection_parameters) as connection: + cursor = connection.cursor(DictCursor) + rows = cursor.execute(query).fetchall() + + for name, group in itertools.groupby(rows, key=lambda x: x["object_name"]): + attributes = defaultdict(set) + for row in group: + attributes[row["property"]].add(row["property_value"]) + + table = next(iter(attributes["TABLE"])) + id_ = table + "." + name + type_ = self._get_type(next(iter(attributes["DATA_TYPE"]))) + description = next(iter(attributes["COMMENT"]), None) + definition = next(iter(attributes["EXPRESSION"]), None) + + metrics.add(Metric(id_, name, type_, definition, description)) + + return metrics + + def _get_type(self, snowflake_type: str | None) -> type[Type]: + """ + Return the semantic type corresponding to a Snowflake type. + """ + if snowflake_type is None: + return STRING + + type_map = { + STRING: {r"VARCHAR\(\d+\)$", "STRING$", "TEXT$", r"CHAR\(\d+\)$"}, + INTEGER: {r"NUMBER\(38,\s?0\)$", "INT$", "INTEGER$", "BIGINT$"}, + DECIMAL: {r"NUMBER\(10,\s?2\)$"}, + NUMBER: {r"NUMBER\(\d+,\s?\d+\)$", "FLOAT$", "DOUBLE$"}, + BOOLEAN: {"BOOLEAN$"}, + DATE: {"DATE$"}, + DATETIME: {"TIMESTAMP_TZ$", "TIMESTAMP__NTZ$"}, + TIME: {"TIME$"}, + OBJECT: {"OBJECT$"}, + BINARY: {r"BINARY\(\d+\)$", r"VARBINARY\(\d+\)$"}, + } + for semantic_type, patterns in type_map.items(): + if any( + re.match(pattern, snowflake_type, re.IGNORECASE) for pattern in patterns + ): + return semantic_type + + return STRING + + __repr__ = uid + if __name__ == "__main__": @@ -359,7 +474,7 @@ if __name__ == "__main__": "auth": { "auth_type": "user_password", "username": "vavila", - "password": "XXX", + "password": "V!tor1995V!tor1995", }, "allow_changing_database": True, "allow_changing_schema": True, @@ -376,4 +491,14 @@ if __name__ == "__main__": "schema": "TPCDS_SF10TCL", } ) - print(semantic_layer.get_explorables(runtime_configuration)) + explorables = semantic_layer.get_explorables(runtime_configuration) + print(explorables) + explorable = next(iter(explorables)) + print("DIMENSIONS") + print("==========") + for dimension in explorable.get_dimensions(): + print(dimension) + print("METRICS") + print("=======") + for metric in explorable.get_metrics(): + print(metric)
