This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch explorable
in repository https://gitbox.apache.org/repos/asf/superset.git

commit b602cdeff13f9ef14b2c2e0c9a8e622581dcb9d1
Author: Beto Dealmeida <[email protected]>
AuthorDate: Mon Oct 6 08:14:30 2025 -0400

    WIP
---
 superset/semantic_layers/snowflake_.py | 241 +++++++++++++++++++++++++--------
 1 file changed, 183 insertions(+), 58 deletions(-)

diff --git a/superset/semantic_layers/snowflake_.py 
b/superset/semantic_layers/snowflake_.py
index 9ab6b8e50d..6212597a58 100644
--- a/superset/semantic_layers/snowflake_.py
+++ b/superset/semantic_layers/snowflake_.py
@@ -17,6 +17,9 @@
 
 from __future__ import annotations
 
+import itertools
+import re
+from collections import defaultdict
 from typing import Any, Literal, Union
 
 from cryptography.hazmat.backends import default_backend
@@ -29,10 +32,26 @@ from pydantic import (
     model_validator,
     SecretStr,
 )
-from snowflake.connector import connect
+from snowflake.connector import connect, DictCursor
 from snowflake.connector.connection import SnowflakeConnection
 from snowflake.sqlalchemy.snowdialect import SnowflakeDialect
 
+from superset.semantic_layers.types import (
+    BINARY,
+    BOOLEAN,
+    DATE,
+    DATETIME,
+    DECIMAL,
+    Dimension,
+    INTEGER,
+    Metric,
+    NUMBER,
+    OBJECT,
+    STRING,
+    TIME,
+    Type,
+)
+
 
 class UserPasswordAuth(BaseModel):
     """
@@ -142,6 +161,51 @@ class SnowflakeConfiguration(BaseModel):
         return self
 
 
+def get_connection_parameters(configuration: SnowflakeConfiguration) -> 
dict[str, Any]:
+    """
+    Convert the configuration to connection parameters for the Snowflake 
connector.
+    """
+    params = {
+        "account": configuration.account_identifier,
+        "application": "Apache Superset",
+        "paramstyle": "qmark",
+        "insecure_mode": True,
+    }
+
+    if configuration.role:
+        params["role"] = configuration.role
+    if configuration.warehouse:
+        params["warehouse"] = configuration.warehouse
+    if configuration.database:
+        params["database"] = configuration.database
+    if configuration.schema_:
+        params["schema"] = configuration.schema_
+
+    auth = configuration.auth
+    if isinstance(auth, UserPasswordAuth):
+        params["user"] = auth.username
+        params["password"] = auth.password.get_secret_value()
+    elif isinstance(auth, PrivateKeyAuth):
+        pem_private_key = serialization.load_pem_private_key(
+            auth.private_key.encode(),
+            password=(
+                auth.private_key_password.encode()
+                if auth.private_key_password
+                else None
+            ),
+            backend=default_backend(),
+        )
+        params["private_key"] = pem_private_key.private_bytes(
+            encoding=serialization.Encoding.DER,
+            format=serialization.PrivateFormat.PKCS8,
+            encryption_algorithm=serialization.NoEncryption(),
+        )
+    else:
+        raise ValueError("Unsupported authentication method")
+
+    return params
+
+
 class SnowflakeSemanticLayer:
 
     configuration_schema = SnowflakeConfiguration
@@ -174,7 +238,7 @@ class SnowflakeSemanticLayer:
 
             return schema
 
-        connection_parameters = cls._get_connection_parameters(configuration)
+        connection_parameters = get_connection_parameters(configuration)
         with connect(**connection_parameters) as connection:
             if all(
                 getattr(configuration, dependency)
@@ -202,7 +266,7 @@ class SnowflakeSemanticLayer:
         """
         fields: dict[str, tuple[type, Field]] = {}
 
-        connection_parameters = cls._get_connection_parameters(configuration)
+        connection_parameters = get_connection_parameters(configuration)
         with connect(**connection_parameters) as connection:
             if not configuration.database or 
configuration.allow_changing_database:
                 options = cls._fetch_databases(connection)
@@ -253,61 +317,13 @@ class SnowflakeSemanticLayer:
         """
         return {row[0] for row in cursor.execute(query, (database,))}
 
-    @classmethod
-    def _get_connection_parameters(
-        cls,
-        configuration: SnowflakeConfiguration,
-    ) -> dict[str, Any]:
-        """
-        Convert the configuration to connection parameters for the Snowflake 
connector.
-        """
-        params = {
-            "account": configuration.account_identifier,
-            "application": "Apache Superset",
-            "paramstyle": "qmark",
-            "insecure_mode": True,
-        }
-
-        if configuration.role:
-            params["role"] = configuration.role
-        if configuration.warehouse:
-            params["warehouse"] = configuration.warehouse
-        if configuration.database:
-            params["database"] = configuration.database
-        if configuration.schema_:
-            params["schema"] = configuration.schema_
-
-        auth = configuration.auth
-        if isinstance(auth, UserPasswordAuth):
-            params["user"] = auth.username
-            params["password"] = auth.password.get_secret_value()
-        elif isinstance(auth, PrivateKeyAuth):
-            pem_private_key = serialization.load_pem_private_key(
-                auth.private_key.encode(),
-                password=(
-                    auth.private_key_password.encode()
-                    if auth.private_key_password
-                    else None
-                ),
-                backend=default_backend(),
-            )
-            params["private_key"] = pem_private_key.private_bytes(
-                encoding=serialization.Encoding.DER,
-                format=serialization.PrivateFormat.PKCS8,
-                encryption_algorithm=serialization.NoEncryption(),
-            )
-        else:
-            raise ValueError("Unsupported authentication method")
-
-        return params
-
     def __init__(self, configuration: SnowflakeConfiguration):
         self.configuration = configuration
 
     def get_explorables(
         self,
         runtime_configuration: BaseModel,
-    ) -> list[SnowflakeExplorable]:
+    ) -> set[SnowflakeExplorable]:
         """
         Get a list of available explorables (databases/schemas).
         """
@@ -316,17 +332,17 @@ class SnowflakeSemanticLayer:
             update=runtime_configuration.model_dump()
         )
 
-        connection_parameters = self._get_connection_parameters(configuration)
+        connection_parameters = get_connection_parameters(configuration)
         with connect(**connection_parameters) as connection:
             cursor = connection.cursor()
             query = """
                 SHOW SEMANTIC VIEWS
                     ->> SELECT "name" FROM $1;
             """
-            return [
+            return {
                 SnowflakeExplorable(configuration, row[0])
                 for row in cursor.execute(query)
-            ]
+            }
 
 
 class SnowflakeExplorable:
@@ -336,7 +352,7 @@ class SnowflakeExplorable:
 
         self._quote = SnowflakeDialect().identifier_preparer.quote
 
-    def __repr__(self) -> str:
+    def uid(self) -> str:
         return ".".join(
             self._quote(part)
             for part in (
@@ -346,6 +362,105 @@ class SnowflakeExplorable:
             )
         )
 
+    def get_dimensions(self) -> set[Dimension]:
+        """
+        Get the dimensions defined in the explorable.
+        """
+        dimensions: set[Dimension] = set()
+
+        query = f"""
+            DESC SEMANTIC VIEW {self.uid()}
+                ->> SELECT "object_name", "property", "property_value"
+                    FROM $1
+                    WHERE
+                        "object_kind" = 'DIMENSION' AND
+                        "property" IN ('COMMENT', 'DATA_TYPE', 'EXPRESSION', 
'TABLE');
+        """  # noqa: S608
+
+        connection_parameters = get_connection_parameters(self.configuration)
+        with connect(**connection_parameters) as connection:
+            cursor = connection.cursor(DictCursor)
+            rows = cursor.execute(query).fetchall()
+
+        for name, group in itertools.groupby(rows, key=lambda x: 
x["object_name"]):
+            attributes = defaultdict(set)
+            for row in group:
+                attributes[row["property"]].add(row["property_value"])
+
+            table = next(iter(attributes["TABLE"]))
+            id_ = table + "." + name
+            type_ = self._get_type(next(iter(attributes["DATA_TYPE"])))
+            description = next(iter(attributes["COMMENT"]), None)
+            definition = next(iter(attributes["EXPRESSION"]), None)
+
+            dimensions.add(Dimension(id_, name, type_, description, 
definition))
+
+        return dimensions
+
+    def get_metrics(self) -> set[Metric]:
+        """
+        Get the metrics defined in the explorable.
+        """
+        metrics: set[Metric] = set()
+
+        query = f"""
+            DESC SEMANTIC VIEW {self.uid()}
+                ->> SELECT "object_name", "property", "property_value"
+                    FROM $1
+                    WHERE
+                        "object_kind" = 'METRIC' AND
+                        "property" IN ('COMMENT', 'DATA_TYPE', 'EXPRESSION', 
'TABLE');
+        """  # noqa: S608
+
+        connection_parameters = get_connection_parameters(self.configuration)
+        with connect(**connection_parameters) as connection:
+            cursor = connection.cursor(DictCursor)
+            rows = cursor.execute(query).fetchall()
+
+        for name, group in itertools.groupby(rows, key=lambda x: 
x["object_name"]):
+            attributes = defaultdict(set)
+            for row in group:
+                attributes[row["property"]].add(row["property_value"])
+
+            table = next(iter(attributes["TABLE"]))
+            id_ = table + "." + name
+            type_ = self._get_type(next(iter(attributes["DATA_TYPE"])))
+            description = next(iter(attributes["COMMENT"]), None)
+            definition = next(iter(attributes["EXPRESSION"]), None)
+
+            metrics.add(Metric(id_, name, type_, definition, description))
+
+        return metrics
+
+    def _get_type(self, snowflake_type: str | None) -> type[Type]:
+        """
+        Return the semantic type corresponding to a Snowflake type.
+        """
+        if snowflake_type is None:
+            return STRING
+
+        type_map = {
+            STRING: {r"VARCHAR\(\d+\)$", "STRING$", "TEXT$", r"CHAR\(\d+\)$"},
+            INTEGER: {r"NUMBER\(38,\s?0\)$", "INT$", "INTEGER$", "BIGINT$"},
+            DECIMAL: {r"NUMBER\(10,\s?2\)$"},
+            NUMBER: {r"NUMBER\(\d+,\s?\d+\)$", "FLOAT$", "DOUBLE$"},
+            BOOLEAN: {"BOOLEAN$"},
+            DATE: {"DATE$"},
+            DATETIME: {"TIMESTAMP_TZ$", "TIMESTAMP__NTZ$"},
+            TIME: {"TIME$"},
+            OBJECT: {"OBJECT$"},
+            BINARY: {r"BINARY\(\d+\)$", r"VARBINARY\(\d+\)$"},
+        }
+        for semantic_type, patterns in type_map.items():
+            if any(
+                re.match(pattern, snowflake_type, re.IGNORECASE) for pattern 
in patterns
+            ):
+                return semantic_type
+
+        return STRING
+
+    __repr__ = uid
+
 
 if __name__ == "__main__":
 
@@ -359,7 +474,7 @@ if __name__ == "__main__":
             "auth": {
                 "auth_type": "user_password",
                 "username": "vavila",
-                "password": "XXX",
+                "password": "V!tor1995V!tor1995",
             },
             "allow_changing_database": True,
             "allow_changing_schema": True,
@@ -376,4 +491,14 @@ if __name__ == "__main__":
             "schema": "TPCDS_SF10TCL",
         }
     )
-    print(semantic_layer.get_explorables(runtime_configuration))
+    explorables = semantic_layer.get_explorables(runtime_configuration)
+    print(explorables)
+    explorable = next(iter(explorables))
+    print("DIMENSIONS")
+    print("==========")
+    for dimension in explorable.get_dimensions():
+        print(dimension)
+    print("METRICS")
+    print("=======")
+    for metric in explorable.get_metrics():
+        print(metric)

Reply via email to