This is an automated email from the ASF dual-hosted git repository.

rusackas pushed a commit to branch rusackas/fix-odps-partition-preview
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 3a7e8d4839f491e4743d99914ebfc51acd172d81
Author: Evan Rusackas <[email protected]>
AuthorDate: Sun Feb 22 20:27:37 2026 -0800

    fix: ODPS (MaxCompute) data source table preview failed
    
    When using the ODPS (MaxCompute) data source, previewing partitioned
    tables in SQLLab would fail because ODPS requires a partition to be
    specified for partition tables.
    
    This PR adds ODPS-specific handling:
    - New OdpsEngineSpec with partition detection support
    - Modified select_star to add partition filter for ODPS partition tables
    - New Partition dataclass in sql/parse.py
    - New is_odps_partitioned_table method in DatabaseDAO
    
    Closes #32301
    
    Co-Authored-By: zhutong6688 <[email protected]>
    Co-Authored-By: Claude Opus 4.5 <[email protected]>
---
 requirements/base.in                               |   3 +
 requirements/base.txt                              |   1 +
 requirements/development.txt                       |   1 +
 superset/daos/database.py                          |  39 +++++
 superset/databases/api.py                          |  17 +-
 superset/db_engine_specs/odps.py                   | 191 +++++++++++++++++++++
 superset/sql/parse.py                              |  31 ++++
 .../db_engine_specs/base_engine_spec_tests.py      |   7 +-
 8 files changed, 284 insertions(+), 6 deletions(-)

diff --git a/requirements/base.in b/requirements/base.in
index deca6a557b0..a427aa425cd 100644
--- a/requirements/base.in
+++ b/requirements/base.in
@@ -48,3 +48,6 @@ openapi-schema-validator>=0.6.3
 # Known affected packages: Preset's 'clients' package
 # See docs/docs/contributing/pkg-resources-migration.md for details
 setuptools<81
+
+# ODPS (MaxCompute) data source support
+pyodps>=0.12.2
diff --git a/requirements/base.txt b/requirements/base.txt
index 7aaa7bfeb4c..9d36762bd4d 100644
--- a/requirements/base.txt
+++ b/requirements/base.txt
@@ -485,3 +485,4 @@ xlsxwriter==3.0.9
     #   pandas
 zstandard==0.23.0
     # via flask-compress
+pyodps==0.12.2
diff --git a/requirements/development.txt b/requirements/development.txt
index 1e7b0f41814..9bb73a5a2ae 100644
--- a/requirements/development.txt
+++ b/requirements/development.txt
@@ -1157,3 +1157,4 @@ zstandard==0.23.0
     # via
     #   -c requirements/base-constraint.txt
     #   flask-compress
+pyodps==0.12.2
diff --git a/superset/daos/database.py b/superset/daos/database.py
index cd1bc3d51b3..095986a7a47 100644
--- a/superset/daos/database.py
+++ b/superset/daos/database.py
@@ -17,8 +17,11 @@
 from __future__ import annotations
 
 import logging
+import re
 from typing import Any
+from urllib.parse import unquote
 
+from odps import ODPS
 from sqlalchemy.orm import joinedload
 
 from superset import is_feature_enabled
@@ -239,6 +242,42 @@ class DatabaseDAO(BaseDAO[Database]):
             .all()
         )
 
+    @classmethod
+    def is_odps_partitioned_table(
+        cls, database: Database, table_name: str
+    ) -> tuple[bool, list[str]]:
+        """
+        This function is used to determine and retrieve
+        partition information of the ODPS table.
+        The return values are whether the partition
+        table is partitioned and the names of all partition fields.
+        """
+        if not database:
+            raise ValueError("Database not found")
+        uri = database.sqlalchemy_uri
+        access_key = database.password
+        pattern = re.compile(
+            
r"odps://(?P<username>[^:]+):(?P<password>[^@]+)@(?P<project>[^/]+)/(?:\?"
+            r"endpoint=(?P<endpoint>[^&]+))"
+        )
+        if not uri or not isinstance(uri, str):
+            logger.warning(
+                "Invalid or missing sqlalchemy URI, please provide a correct 
URI"
+            )
+            return False, []
+        if match := pattern.match(unquote(uri)):
+            access_id = match.group("username")
+            project = match.group("project")
+            endpoint = match.group("endpoint")
+            odps_client = ODPS(access_id, access_key, project, 
endpoint=endpoint)
+            table = odps_client.get_table(table_name)
+            if table.exist_partition:
+                partition_spec = table.table_schema.partitions
+                partition_fields = [partition.name for partition in 
partition_spec]
+                return True, partition_fields
+            return False, []
+        return False, []
+
 
 class DatabaseUserOAuth2TokensDAO(BaseDAO[DatabaseUserOAuth2Tokens]):
     """
diff --git a/superset/databases/api.py b/superset/databases/api.py
index e9178d1f24e..4f6e2fa48a3 100644
--- a/superset/databases/api.py
+++ b/superset/databases/api.py
@@ -111,6 +111,7 @@ from superset.databases.schemas import (
 )
 from superset.databases.utils import get_table_metadata
 from superset.db_engine_specs import get_available_engine_specs
+from superset.db_engine_specs.odps import OdpsEngineSpec
 from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
 from superset.exceptions import (
     DatabaseNotFoundException,
@@ -123,7 +124,7 @@ from superset.exceptions import (
 )
 from superset.extensions import security_manager
 from superset.models.core import Database
-from superset.sql.parse import Table
+from superset.sql.parse import Partition, Table
 from superset.superset_typing import FlaskResponse
 from superset.utils import json
 from superset.utils.core import (
@@ -1079,15 +1080,21 @@ class DatabaseRestApi(BaseSupersetModelRestApi):
             parameters = QualifiedTableSchema().load(request.args)
         except ValidationError as ex:
             raise InvalidPayloadSchemaError(ex) from ex
-
-        table = Table(parameters["name"], parameters["schema"], 
parameters["catalog"])
+        table_name = str(parameters["name"])
+        table = Table(table_name, parameters["schema"], parameters["catalog"])
+        is_partitioned_table, partition_fields = 
DatabaseDAO.is_odps_partitioned_table(
+            database, table_name
+        )
         try:
             security_manager.raise_for_access(database=database, table=table)
         except SupersetSecurityException as ex:
             # instead of raising 403, raise 404 to hide table existence
             raise TableNotFoundException("No such table") from ex
-
-        payload = database.db_engine_spec.get_table_metadata(database, table)
+        partition = Partition(is_partitioned_table, partition_fields)
+        if is_partitioned_table:
+            payload = OdpsEngineSpec.get_table_metadata(database, table, 
partition)
+        else:
+            payload = database.db_engine_spec.get_table_metadata(database, 
table)
 
         return self.response(200, **payload)
 
diff --git a/superset/db_engine_specs/odps.py b/superset/db_engine_specs/odps.py
new file mode 100644
index 00000000000..2c334818279
--- /dev/null
+++ b/superset/db_engine_specs/odps.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import logging
+from typing import Any, Optional, TYPE_CHECKING
+
+from sqlalchemy import select, text
+from sqlalchemy.engine.base import Engine
+
+from superset.databases.schemas import (
+    TableMetadataColumnsResponse,
+    TableMetadataResponse,
+)
+from superset.databases.utils import (
+    get_col_type,
+    get_foreign_keys_metadata,
+    get_indexes_metadata,
+)
+from superset.db_engine_specs.base import BaseEngineSpec, BasicParametersMixin
+from superset.sql.parse import Partition, SQLScript
+from superset.sql_parse import Table
+from superset.superset_typing import ResultSetColumnType
+
+if TYPE_CHECKING:
+    from superset.models.core import Database
+
+logger = logging.getLogger(__name__)
+
+
+class OdpsBaseEngineSpec(BaseEngineSpec):
+    @classmethod
+    def get_table_metadata(
+        cls,
+        database: Database,
+        table: Table,
+        partition: Optional[Partition] = None,
+    ) -> TableMetadataResponse:
+        """
+        Returns basic table metadata
+        :param database: Database instance
+        :param table: A Table instance
+        :param partition: A Table partition info
+        :return: Basic table metadata
+        """
+        return cls.get_table_metadata(database, table, partition)
+
+
+class OdpsEngineSpec(BasicParametersMixin, OdpsBaseEngineSpec):
+    default_driver = "odps"
+
+    @classmethod
+    def get_table_metadata(
+        cls, database: Any, table: Table, partition: Optional[Partition] = None
+    ) -> TableMetadataResponse:
+        """
+        Get table metadata information, including type, pk, fks.
+        This function raises SQLAlchemyError when a schema is not found.
+
+        :param partition: The table's partition info
+        :param database: The database model
+        :param table: Table instance
+        :return: Dict table metadata ready for API response
+        """
+        keys = []
+        columns = database.get_columns(table)
+        primary_key = database.get_pk_constraint(table)
+        if primary_key and primary_key.get("constrained_columns"):
+            primary_key["column_names"] = 
primary_key.pop("constrained_columns")
+            primary_key["type"] = "pk"
+            keys += [primary_key]
+        foreign_keys = get_foreign_keys_metadata(database, table)
+        indexes = get_indexes_metadata(database, table)
+        keys += foreign_keys + indexes
+        payload_columns: list[TableMetadataColumnsResponse] = []
+        table_comment = database.get_table_comment(table)
+        for col in columns:
+            dtype = get_col_type(col)
+            payload_columns.append(
+                {
+                    "name": col["column_name"],
+                    "type": dtype.split("(")[0] if "(" in dtype else dtype,
+                    "longType": dtype,
+                    "keys": [
+                        k for k in keys if col["column_name"] in 
k["column_names"]
+                    ],
+                    "comment": col.get("comment"),
+                }
+            )
+
+        with database.get_sqla_engine(
+            catalog=table.catalog, schema=table.schema
+        ) as engine:
+            return {
+                "name": table.table,
+                "columns": payload_columns,
+                "selectStar": cls.select_star(
+                    database=database,
+                    table=table,
+                    engine=engine,
+                    limit=100,
+                    show_cols=False,
+                    indent=True,
+                    latest_partition=True,
+                    cols=columns,
+                    partition=partition,
+                ),
+                "primaryKey": primary_key,
+                "foreignKeys": foreign_keys,
+                "indexes": keys,
+                "comment": table_comment,
+            }
+
+    @classmethod
+    def select_star(  # pylint: disable=too-many-arguments
+        cls,
+        database: Database,
+        table: Table,
+        engine: Engine,
+        limit: int = 100,
+        show_cols: bool = False,
+        indent: bool = True,
+        latest_partition: bool = True,
+        cols: list[ResultSetColumnType] | None = None,
+        partition: Optional[Partition] = None,
+    ) -> str:
+        """
+        Generate a "SELECT * from [schema.]table_name" query with appropriate 
limit.
+
+        WARNING: expects only unquoted table and schema names.
+
+        :param partition: The table's partition info
+        :param database: Database instance
+        :param table: Table instance
+        :param engine: SqlAlchemy Engine instance
+        :param limit: limit to impose on query
+        :param show_cols: Show columns in query; otherwise use "*"
+        :param indent: Add indentation to query
+        :param latest_partition: Only query the latest partition
+        :param cols: Columns to include in query
+        :return: SQL query
+        """
+        # pylint: disable=redefined-outer-name
+        fields: str | list[Any] = "*"
+        cols = cols or []
+        if (show_cols or latest_partition) and not cols:
+            cols = database.get_columns(table)
+
+        if show_cols:
+            fields = cls._get_fields(cols)
+        full_table_name = cls.quote_table(table, engine.dialect)
+        qry = select(fields).select_from(text(full_table_name))
+        if database.backend == "odps":
+            if (
+                partition is not None
+                and partition.is_partitioned_table
+                and partition.partition_column is not None
+                and len(partition.partition_column) > 0
+            ):
+                partition_str = partition.partition_column[0]
+                partition_str_where = f"CAST({partition_str} AS STRING) LIKE 
'%'"
+                qry = qry.where(text(partition_str_where))
+        if limit:
+            qry = qry.limit(limit)
+        if latest_partition:
+            partition_query = cls.where_latest_partition(
+                database,
+                table,
+                qry,
+                columns=cols,
+            )
+            if partition_query is not None:
+                qry = partition_query
+        sql = database.compile_sqla_query(qry, table.catalog, table.schema)
+        if indent:
+            sql = SQLScript(sql, engine=cls.engine).format()
+        return sql
diff --git a/superset/sql/parse.py b/superset/sql/parse.py
index fdbc524b149..ad403962abd 100644
--- a/superset/sql/parse.py
+++ b/superset/sql/parse.py
@@ -321,6 +321,37 @@ class Table:
         )
 
 
+@dataclass(eq=True, frozen=True)
+class Partition:
+    """
+    Partition object, with two attribute keys:
+    ispartitioned_table and partition_comlumn,
+    used to provide partition information
+    Here is an example of an object:
+    {"ispartitioned_table":true,"partition_column":["month","day"]}
+    """
+
+    is_partitioned_table: bool
+    partition_column: list[str] | None = None
+
+    def __str__(self) -> str:
+        """
+        Return the partition columns of table name.
+        """
+        partition_column_str = (
+            ", ".join(map(str, self.partition_column))
+            if self.partition_column
+            else "None"
+        )
+        return (
+            f"Partition(is_partitioned_table={self.is_partitioned_table}, "
+            f"partition_column=[{partition_column_str}])"
+        )
+
+    def __eq__(self, other: Any) -> bool:
+        return str(self) == str(other)
+
+
 # To avoid unnecessary parsing/formatting of queries, the statement has the 
concept of
 # an "internal representation", which is the AST of the SQL statement. For 
most of the
 # engines supported by Superset this is `sqlglot.exp.Expression`, but there is 
a special
diff --git a/tests/integration_tests/db_engine_specs/base_engine_spec_tests.py 
b/tests/integration_tests/db_engine_specs/base_engine_spec_tests.py
index 122bb271a28..d36e75e08e3 100644
--- a/tests/integration_tests/db_engine_specs/base_engine_spec_tests.py
+++ b/tests/integration_tests/db_engine_specs/base_engine_spec_tests.py
@@ -27,6 +27,7 @@ from superset.db_engine_specs.base import (
     builtin_time_grains,
 )
 from superset.db_engine_specs.mysql import MySQLEngineSpec
+from superset.db_engine_specs.odps import OdpsBaseEngineSpec, OdpsEngineSpec
 from superset.db_engine_specs.sqlite import SqliteEngineSpec
 from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
 from superset.sql.parse import Table
@@ -80,7 +81,11 @@ class SupersetTestCases(SupersetTestCase):
         time_grains = set(builtin_time_grains.keys())
         # loop over all subclasses of BaseEngineSpec
         for engine in load_engine_specs():
-            if engine is not BaseEngineSpec:
+            if (
+                engine is not BaseEngineSpec
+                and engine is not OdpsBaseEngineSpec
+                and engine is not OdpsEngineSpec
+            ):
                 # make sure time grain functions have been defined
                 assert len(engine.get_time_grain_expressions()) > 0
                 # make sure all defined time grains are supported

Reply via email to