This is an automated email from the ASF dual-hosted git repository. rusackas pushed a commit to branch rusackas/fix-odps-partition-preview in repository https://gitbox.apache.org/repos/asf/superset.git
commit 3a7e8d4839f491e4743d99914ebfc51acd172d81 Author: Evan Rusackas <[email protected]> AuthorDate: Sun Feb 22 20:27:37 2026 -0800 fix: ODPS (MaxCompute) data source table preview failed When using the ODPS (MaxCompute) data source, previewing partitioned tables in SQLLab would fail because ODPS requires a partition to be specified for partition tables. This PR adds ODPS-specific handling: - New OdpsEngineSpec with partition detection support - Modified select_star to add partition filter for ODPS partition tables - New Partition dataclass in sql/parse.py - New is_odps_partitioned_table method in DatabaseDAO Closes #32301 Co-Authored-By: zhutong6688 <[email protected]> Co-Authored-By: Claude Opus 4.5 <[email protected]> --- requirements/base.in | 3 + requirements/base.txt | 1 + requirements/development.txt | 1 + superset/daos/database.py | 39 +++++ superset/databases/api.py | 17 +- superset/db_engine_specs/odps.py | 191 +++++++++++++++++++++ superset/sql/parse.py | 31 ++++ .../db_engine_specs/base_engine_spec_tests.py | 7 +- 8 files changed, 284 insertions(+), 6 deletions(-) diff --git a/requirements/base.in b/requirements/base.in index deca6a557b0..a427aa425cd 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -48,3 +48,6 @@ openapi-schema-validator>=0.6.3 # Known affected packages: Preset's 'clients' package # See docs/docs/contributing/pkg-resources-migration.md for details setuptools<81 + +# ODPS (MaxCompute) data source support +pyodps>=0.12.2 diff --git a/requirements/base.txt b/requirements/base.txt index 7aaa7bfeb4c..9d36762bd4d 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -485,3 +485,4 @@ xlsxwriter==3.0.9 # pandas zstandard==0.23.0 # via flask-compress +pyodps==0.12.2 diff --git a/requirements/development.txt b/requirements/development.txt index 1e7b0f41814..9bb73a5a2ae 100644 --- a/requirements/development.txt +++ b/requirements/development.txt @@ -1157,3 +1157,4 @@ zstandard==0.23.0 # via # -c requirements/base-constraint.txt # flask-compress +pyodps==0.12.2 diff --git a/superset/daos/database.py b/superset/daos/database.py index cd1bc3d51b3..095986a7a47 100644 --- a/superset/daos/database.py +++ b/superset/daos/database.py @@ -17,8 +17,11 @@ from __future__ import annotations import logging +import re from typing import Any +from urllib.parse import unquote +from odps import ODPS from sqlalchemy.orm import joinedload from superset import is_feature_enabled @@ -239,6 +242,42 @@ class DatabaseDAO(BaseDAO[Database]): .all() ) + @classmethod + def is_odps_partitioned_table( + cls, database: Database, table_name: str + ) -> tuple[bool, list[str]]: + """ + This function is used to determine and retrieve + partition information of the ODPS table. + The return values are whether the partition + table is partitioned and the names of all partition fields. + """ + if not database: + raise ValueError("Database not found") + uri = database.sqlalchemy_uri + access_key = database.password + pattern = re.compile( + r"odps://(?P<username>[^:]+):(?P<password>[^@]+)@(?P<project>[^/]+)/(?:\?" + r"endpoint=(?P<endpoint>[^&]+))" + ) + if not uri or not isinstance(uri, str): + logger.warning( + "Invalid or missing sqlalchemy URI, please provide a correct URI" + ) + return False, [] + if match := pattern.match(unquote(uri)): + access_id = match.group("username") + project = match.group("project") + endpoint = match.group("endpoint") + odps_client = ODPS(access_id, access_key, project, endpoint=endpoint) + table = odps_client.get_table(table_name) + if table.exist_partition: + partition_spec = table.table_schema.partitions + partition_fields = [partition.name for partition in partition_spec] + return True, partition_fields + return False, [] + return False, [] + class DatabaseUserOAuth2TokensDAO(BaseDAO[DatabaseUserOAuth2Tokens]): """ diff --git a/superset/databases/api.py b/superset/databases/api.py index e9178d1f24e..4f6e2fa48a3 100644 --- a/superset/databases/api.py +++ b/superset/databases/api.py @@ -111,6 +111,7 @@ from superset.databases.schemas import ( ) from superset.databases.utils import get_table_metadata from superset.db_engine_specs import get_available_engine_specs +from superset.db_engine_specs.odps import OdpsEngineSpec from superset.errors import ErrorLevel, SupersetError, SupersetErrorType from superset.exceptions import ( DatabaseNotFoundException, @@ -123,7 +124,7 @@ from superset.exceptions import ( ) from superset.extensions import security_manager from superset.models.core import Database -from superset.sql.parse import Table +from superset.sql.parse import Partition, Table from superset.superset_typing import FlaskResponse from superset.utils import json from superset.utils.core import ( @@ -1079,15 +1080,21 @@ class DatabaseRestApi(BaseSupersetModelRestApi): parameters = QualifiedTableSchema().load(request.args) except ValidationError as ex: raise InvalidPayloadSchemaError(ex) from ex - - table = Table(parameters["name"], parameters["schema"], parameters["catalog"]) + table_name = str(parameters["name"]) + table = Table(table_name, parameters["schema"], parameters["catalog"]) + is_partitioned_table, partition_fields = DatabaseDAO.is_odps_partitioned_table( + database, table_name + ) try: security_manager.raise_for_access(database=database, table=table) except SupersetSecurityException as ex: # instead of raising 403, raise 404 to hide table existence raise TableNotFoundException("No such table") from ex - - payload = database.db_engine_spec.get_table_metadata(database, table) + partition = Partition(is_partitioned_table, partition_fields) + if is_partitioned_table: + payload = OdpsEngineSpec.get_table_metadata(database, table, partition) + else: + payload = database.db_engine_spec.get_table_metadata(database, table) return self.response(200, **payload) diff --git a/superset/db_engine_specs/odps.py b/superset/db_engine_specs/odps.py new file mode 100644 index 00000000000..2c334818279 --- /dev/null +++ b/superset/db_engine_specs/odps.py @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import logging +from typing import Any, Optional, TYPE_CHECKING + +from sqlalchemy import select, text +from sqlalchemy.engine.base import Engine + +from superset.databases.schemas import ( + TableMetadataColumnsResponse, + TableMetadataResponse, +) +from superset.databases.utils import ( + get_col_type, + get_foreign_keys_metadata, + get_indexes_metadata, +) +from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin +from superset.sql.parse import Partition, SQLScript +from superset.sql_parse import Table +from superset.superset_typing import ResultSetColumnType + +if TYPE_CHECKING: + from superset.models.core import Database + +logger = logging.getLogger(__name__) + + +class OdpsBaseEngineSpec(BaseEngineSpec): + @classmethod + def get_table_metadata( + cls, + database: Database, + table: Table, + partition: Optional[Partition] = None, + ) -> TableMetadataResponse: + """ + Returns basic table metadata + :param database: Database instance + :param table: A Table instance + :param partition: A Table partition info + :return: Basic table metadata + """ + return cls.get_table_metadata(database, table, partition) + + +class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec): + default_driver = "odps" + + @classmethod + def get_table_metadata( + cls, database: Any, table: Table, partition: Optional[Partition] = None + ) -> TableMetadataResponse: + """ + Get table metadata information, including type, pk, fks. + This function raises SQLAlchemyError when a schema is not found. + + :param partition: The table's partition info + :param database: The database model + :param table: Table instance + :return: Dict table metadata ready for API response + """ + keys = [] + columns = database.get_columns(table) + primary_key = database.get_pk_constraint(table) + if primary_key and primary_key.get("constrained_columns"): + primary_key["column_names"] = primary_key.pop("constrained_columns") + primary_key["type"] = "pk" + keys += [primary_key] + foreign_keys = get_foreign_keys_metadata(database, table) + indexes = get_indexes_metadata(database, table) + keys += foreign_keys + indexes + payload_columns: list[TableMetadataColumnsResponse] = [] + table_comment = database.get_table_comment(table) + for col in columns: + dtype = get_col_type(col) + payload_columns.append( + { + "name": col["column_name"], + "type": dtype.split("(")[0] if "(" in dtype else dtype, + "longType": dtype, + "keys": [ + k for k in keys if col["column_name"] in k["column_names"] + ], + "comment": col.get("comment"), + } + ) + + with database.get_sqla_engine( + catalog=table.catalog, schema=table.schema + ) as engine: + return { + "name": table.table, + "columns": payload_columns, + "selectStar": cls.select_star( + database=database, + table=table, + engine=engine, + limit=100, + show_cols=False, + indent=True, + latest_partition=True, + cols=columns, + partition=partition, + ), + "primaryKey": primary_key, + "foreignKeys": foreign_keys, + "indexes": keys, + "comment": table_comment, + } + + @classmethod + def select_star( # pylint: disable=too-many-arguments + cls, + database: Database, + table: Table, + engine: Engine, + limit: int = 100, + show_cols: bool = False, + indent: bool = True, + latest_partition: bool = True, + cols: list[ResultSetColumnType] | None = None, + partition: Optional[Partition] = None, + ) -> str: + """ + Generate a "SELECT * from [schema.]table_name" query with appropriate limit. + + WARNING: expects only unquoted table and schema names. + + :param partition: The table's partition info + :param database: Database instance + :param table: Table instance + :param engine: SqlAlchemy Engine instance + :param limit: limit to impose on query + :param show_cols: Show columns in query; otherwise use "*" + :param indent: Add indentation to query + :param latest_partition: Only query the latest partition + :param cols: Columns to include in query + :return: SQL query + """ + # pylint: disable=redefined-outer-name + fields: str | list[Any] = "*" + cols = cols or [] + if (show_cols or latest_partition) and not cols: + cols = database.get_columns(table) + + if show_cols: + fields = cls._get_fields(cols) + full_table_name = cls.quote_table(table, engine.dialect) + qry = select(fields).select_from(text(full_table_name)) + if database.backend == "odps": + if ( + partition is not None + and partition.is_partitioned_table + and partition.partition_column is not None + and len(partition.partition_column) > 0 + ): + partition_str = partition.partition_column[0] + partition_str_where = f"CAST({partition_str} AS STRING) LIKE '%'" + qry = qry.where(text(partition_str_where)) + if limit: + qry = qry.limit(limit) + if latest_partition: + partition_query = cls.where_latest_partition( + database, + table, + qry, + columns=cols, + ) + if partition_query is not None: + qry = partition_query + sql = database.compile_sqla_query(qry, table.catalog, table.schema) + if indent: + sql = SQLScript(sql, engine=cls.engine).format() + return sql diff --git a/superset/sql/parse.py b/superset/sql/parse.py index fdbc524b149..ad403962abd 100644 --- a/superset/sql/parse.py +++ b/superset/sql/parse.py @@ -321,6 +321,37 @@ class Table: ) +@dataclass(eq=True, frozen=True) +class Partition: + """ + Partition object, with two attribute keys: + ispartitioned_table and partition_comlumn, + used to provide partition information + Here is an example of an object: + {"ispartitioned_table":true,"partition_column":["month","day"]} + """ + + is_partitioned_table: bool + partition_column: list[str] | None = None + + def __str__(self) -> str: + """ + Return the partition columns of table name. + """ + partition_column_str = ( + ", ".join(map(str, self.partition_column)) + if self.partition_column + else "None" + ) + return ( + f"Partition(is_partitioned_table={self.is_partitioned_table}, " + f"partition_column=[{partition_column_str}])" + ) + + def __eq__(self, other: Any) -> bool: + return str(self) == str(other) + + # To avoid unnecessary parsing/formatting of queries, the statement has the concept of # an "internal representation", which is the AST of the SQL statement. For most of the # engines supported by Superset this is `sqlglot.exp.Expression`, but there is a special diff --git a/tests/integration_tests/db_engine_specs/base_engine_spec_tests.py b/tests/integration_tests/db_engine_specs/base_engine_spec_tests.py index 122bb271a28..d36e75e08e3 100644 --- a/tests/integration_tests/db_engine_specs/base_engine_spec_tests.py +++ b/tests/integration_tests/db_engine_specs/base_engine_spec_tests.py @@ -27,6 +27,7 @@ from superset.db_engine_specs.base import ( builtin_time_grains, ) from superset.db_engine_specs.mysql import MySQLEngineSpec +from superset.db_engine_specs.odps import OdpsBaseEngineSpec, OdpsEngineSpec from superset.db_engine_specs.sqlite import SqliteEngineSpec from superset.errors import ErrorLevel, SupersetError, SupersetErrorType from superset.sql.parse import Table @@ -80,7 +81,11 @@ class SupersetTestCases(SupersetTestCase): time_grains = set(builtin_time_grains.keys()) # loop over all subclasses of BaseEngineSpec for engine in load_engine_specs(): - if engine is not BaseEngineSpec: + if ( + engine is not BaseEngineSpec + and engine is not OdpsBaseEngineSpec + and engine is not OdpsEngineSpec + ): # make sure time grain functions have been defined assert len(engine.get_time_grain_expressions()) > 0 # make sure all defined time grains are supported
