This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch explorable
in repository https://gitbox.apache.org/repos/asf/superset.git

commit af551ac2c1392f575ecead208ae59a4ed54fd88a
Author: Beto Dealmeida <[email protected]>
AuthorDate: Sun Oct 5 13:42:58 2025 -0400

    WIP
---
 superset/semantic_layers/__init__.py   |  16 ++
 superset/semantic_layers/snowflake_.py | 379 +++++++++++++++++++++++++++++++++
 2 files changed, 395 insertions(+)

diff --git a/superset/semantic_layers/__init__.py 
b/superset/semantic_layers/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/superset/semantic_layers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/superset/semantic_layers/snowflake_.py 
b/superset/semantic_layers/snowflake_.py
new file mode 100644
index 0000000000..9ab6b8e50d
--- /dev/null
+++ b/superset/semantic_layers/snowflake_.py
@@ -0,0 +1,379 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from typing import Any, Literal, Union
+
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import serialization
+from pydantic import (
+    BaseModel,
+    ConfigDict,
+    create_model,
+    Field,
+    model_validator,
+    SecretStr,
+)
+from snowflake.connector import connect
+from snowflake.connector.connection import SnowflakeConnection
+from snowflake.sqlalchemy.snowdialect import SnowflakeDialect
+
+
+class UserPasswordAuth(BaseModel):
+    """
+    Username and password authentication.
+    """
+
+    model_config = ConfigDict(title="Username and password")
+
+    auth_type: Literal["user_password"] = "user_password"
+    username: str = Field(description="The username to authenticate as.")
+    password: SecretStr = Field(
+        description="The password to authenticate with.",
+        repr=False,
+    )
+
+
+class PrivateKeyAuth(BaseModel):
+    """
+    Private key authentication.
+    """
+
+    model_config = ConfigDict(title="Private key")
+
+    auth_type: Literal["private_key"] = "private_key"
+    private_key: SecretStr = Field(
+        description="The private key to authenticate with, in PEM format.",
+        repr=False,
+    )
+    private_key_password: SecretStr = Field(
+        description="The password to decrypt the private key with.",
+        repr=False,
+    )
+
+
+class SnowflakeConfiguration(BaseModel):
+    """
+    Parameters needed to connect to Snowflake.
+    """
+
+    # account is the only required parameter
+    account_identifier: str = Field(
+        description="The Snowflake account identifier.",
+        json_schema_extra={"examples": ["abc12345"]},
+    )
+
+    role: str | None = Field(
+        default=None,
+        description="The default role to use.",
+        json_schema_extra={"examples": ["myrole"]},
+    )
+    warehouse: str | None = Field(
+        default=None,
+        description="The default warehouse to use.",
+        json_schema_extra={"examples": ["testwh"]},
+    )
+
+    auth: Union[UserPasswordAuth, PrivateKeyAuth] = Field(
+        discriminator="auth_type",
+        description="Authentication method",
+    )
+
+    # database and schema can be optionally provided; if not provided the user
+    # will be able to browse databases/schemas
+    database: str | None = Field(
+        default=None,
+        description="The default database to use.",
+        json_schema_extra={
+            "examples": ["testdb"],
+            "x-dynamic": True,
+            "x-dependsOn": ["account_identifier", "auth"],
+        },
+    )
+    allow_changing_database: bool = Field(
+        default=False,
+        description="Allow changing the default database.",
+    )
+    schema_: str | None = Field(
+        default=None,
+        description="The default schema to use.",
+        json_schema_extra={
+            "examples": ["public"],
+            "x-dynamic": True,
+            "x-dependsOn": ["account_identifier", "auth", "database"],
+        },
+        # `schema` is an attribute of `BaseModel` so it needs to be aliased
+        alias="schema",
+    )
+    allow_changing_schema: bool = Field(
+        default=False,
+        description="Allow changing the default schema.",
+    )
+
+    @model_validator(mode="after")
+    def validate_database_schema_settings(self) -> SnowflakeConfiguration:
+        """
+        Validate that if database or schema is not specified, the corresponding
+        allow_changing flag must be true.
+        """
+        if not self.database and not self.allow_changing_database:
+            raise ValueError(
+                "If no database is specified, allow_changing_database must be 
true"
+            )
+        if not self.schema_ and not self.allow_changing_schema:
+            raise ValueError(
+                "If no schema is specified, allow_changing_schema must be true"
+            )
+        return self
+
+
+class SnowflakeSemanticLayer:
+
+    configuration_schema = SnowflakeConfiguration
+
+    @classmethod
+    def get_configuration_schema(
+        cls,
+        configuration: SnowflakeConfiguration | None = None,
+    ) -> dict[str, Any]:
+        """
+        Get the JSON schema for the configuration needed to add the semantic 
layer.
+
+        A partial configuration can be sent to improve the schema. For example,
+        providing account and auth will allow the schema to provide a list of
+        databases; providing a database will allow the schema to provide a 
list of
+        schemas.
+
+        Note that database and schema can both be left empty when the semantic 
layer is
+        added to Superset; the user will then have to provide them when loading
+        explorables.
+        """
+        schema = cls.configuration_schema.model_json_schema()
+        properties = schema["properties"]
+
+        if configuration is None:
+            # set these to empty; they will be populated when a partial 
configuration is
+            # passed
+            properties["database"]["enum"] = []
+            properties["schema"]["enum"] = []
+
+            return schema
+
+        connection_parameters = cls._get_connection_parameters(configuration)
+        with connect(**connection_parameters) as connection:
+            if all(
+                getattr(configuration, dependency)
+                for dependency in properties["database"].get("x-dependsOn", [])
+            ):
+                options = cls._fetch_databases(connection)
+                properties["database"]["enum"] = list(options)
+
+            if all(
+                getattr(configuration, dependency)
+                for dependency in properties["schema"].get("x-dependsOn", [])
+            ):
+                options = cls._fetch_schemas(connection, 
configuration.database)
+                properties["schema"]["enum"] = list(options)
+
+        return schema
+
+    @classmethod
+    def get_runtime_schema(
+        cls,
+        configuration: SnowflakeConfiguration,
+    ) -> dict[str, Any]:
+        """
+        Get the JSON schema for the runtime parameters needed to load 
explorables.
+        """
+        fields: dict[str, tuple[type, Field]] = {}
+
+        connection_parameters = cls._get_connection_parameters(configuration)
+        with connect(**connection_parameters) as connection:
+            if not configuration.database or 
configuration.allow_changing_database:
+                options = cls._fetch_databases(connection)
+                fields["database"] = (
+                    Literal[*options],
+                    Field(description="The default database to use."),
+                )
+
+            if not configuration.schema_ or 
configuration.allow_changing_schema:
+                options = cls._fetch_schemas(connection, 
configuration.database)
+                fields["schema_"] = (
+                    Literal[*options],
+                    Field(description="The default schema to use.", 
alias="schema"),
+                )
+
+        return create_model("RuntimeParameters", **fields).model_json_schema()
+
+    @classmethod
+    def _fetch_databases(cls, connection: SnowflakeConnection) -> set[str]:
+        """
+        Fetch the list of databases available in the Snowflake account.
+
+        We use `SHOW DATABASES` instead of querying the information schema 
since it
+        allows to retrieve the list of databases without having to specify a 
database
+        when connecting.
+        """
+        cursor = connection.cursor()
+        cursor.execute("SHOW DATABASES")
+        return {row[1] for row in cursor}
+
+    @classmethod
+    def _fetch_schemas(
+        cls,
+        connection: SnowflakeConnection,
+        database: str | None,
+    ) -> set[str]:
+        """
+        Fetch the list of schemas available in a given database.
+        """
+        if not database:
+            return set()
+
+        cursor = connection.cursor()
+        query = """
+            SELECT SCHEMA_NAME
+            FROM INFORMATION_SCHEMA.SCHEMATA
+            WHERE CATALOG_NAME = ?
+        """
+        return {row[0] for row in cursor.execute(query, (database,))}
+
+    @classmethod
+    def _get_connection_parameters(
+        cls,
+        configuration: SnowflakeConfiguration,
+    ) -> dict[str, Any]:
+        """
+        Convert the configuration to connection parameters for the Snowflake 
connector.
+        """
+        params = {
+            "account": configuration.account_identifier,
+            "application": "Apache Superset",
+            "paramstyle": "qmark",
+            "insecure_mode": True,
+        }
+
+        if configuration.role:
+            params["role"] = configuration.role
+        if configuration.warehouse:
+            params["warehouse"] = configuration.warehouse
+        if configuration.database:
+            params["database"] = configuration.database
+        if configuration.schema_:
+            params["schema"] = configuration.schema_
+
+        auth = configuration.auth
+        if isinstance(auth, UserPasswordAuth):
+            params["user"] = auth.username
+            params["password"] = auth.password.get_secret_value()
+        elif isinstance(auth, PrivateKeyAuth):
+            pem_private_key = serialization.load_pem_private_key(
+                auth.private_key.encode(),
+                password=(
+                    auth.private_key_password.encode()
+                    if auth.private_key_password
+                    else None
+                ),
+                backend=default_backend(),
+            )
+            params["private_key"] = pem_private_key.private_bytes(
+                encoding=serialization.Encoding.DER,
+                format=serialization.PrivateFormat.PKCS8,
+                encryption_algorithm=serialization.NoEncryption(),
+            )
+        else:
+            raise ValueError("Unsupported authentication method")
+
+        return params
+
+    def __init__(self, configuration: SnowflakeConfiguration):
+        self.configuration = configuration
+
+    def get_explorables(
+        self,
+        runtime_configuration: BaseModel,
+    ) -> list[SnowflakeExplorable]:
+        """
+        Get a list of available explorables (databases/schemas).
+        """
+        # create a new configuration with the runtime parameters
+        configuration = self.configuration.model_copy(
+            update=runtime_configuration.model_dump()
+        )
+
+        connection_parameters = self._get_connection_parameters(configuration)
+        with connect(**connection_parameters) as connection:
+            cursor = connection.cursor()
+            query = """
+                SHOW SEMANTIC VIEWS
+                    ->> SELECT "name" FROM $1;
+            """
+            return [
+                SnowflakeExplorable(configuration, row[0])
+                for row in cursor.execute(query)
+            ]
+
+
+class SnowflakeExplorable:
+    def __init__(self, configuration: SnowflakeConfiguration, name: str):
+        self.configuration = configuration
+        self.name = name
+
+        self._quote = SnowflakeDialect().identifier_preparer.quote
+
+    def __repr__(self) -> str:
+        return ".".join(
+            self._quote(part)
+            for part in (
+                self.configuration.database,
+                self.configuration.schema_,
+                self.name,
+            )
+        )
+
+
+if __name__ == "__main__":
+
+    configuration = SnowflakeConfiguration.model_validate(
+        {
+            "account_identifier": "KFTRUWN-VX32922",
+            "role": "ACCOUNTADMIN",
+            "warehouse": "COMPUTE_WH",
+            "database": "SAMPLE_DATA",
+            "schema": "TPCDS_SF10TCL",
+            "auth": {
+                "auth_type": "user_password",
+                "username": "vavila",
+                "password": "XXX",
+            },
+            "allow_changing_database": True,
+            "allow_changing_schema": True,
+        }
+    )
+    semantic_layer = SnowflakeSemanticLayer(configuration)
+    runtime_configuration = create_model(
+        "RuntimeParameters",
+        database=(Literal["SAMPLE_DATA"], Field()),
+        schema_=(Literal["TPCDS_SF10TCL"], Field(alias="schema")),
+    ).model_validate(
+        {
+            "database": "SAMPLE_DATA",
+            "schema": "TPCDS_SF10TCL",
+        }
+    )
+    print(semantic_layer.get_explorables(runtime_configuration))

Reply via email to