This is an automated email from the ASF dual-hosted git repository.

maximebeauchemin pushed a commit to branch bigquery_partitions
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 4824dbb1e87d790483b4a8d0458364ec23aee853
Author: Maxime Beauchemin <[email protected]>
AuthorDate: Wed Oct 30 19:30:10 2024 -0700

    feat(sqllab): add latest partition support for BigQuery
    
    Adding db_engine_spec-related features that enables SQL Lab to show the 
latest partition when using time partitioning in BigQuery as well as applying a 
WHERE clause on the latest partition by default when fetching the sample 
dataset. Turns out that `SELECT * FROM {{...}} LIMIT {n}` can be costly against 
large tables in BigQuery as it results in a full table scan.
---
 docker/pythonpath_dev/superset_config.py |   1 +
 superset/db_engine_specs/base.py         |  15 ----
 superset/db_engine_specs/bigquery.py     | 123 +++++++++++++++++++------------
 3 files changed, 77 insertions(+), 62 deletions(-)

diff --git a/docker/pythonpath_dev/superset_config.py 
b/docker/pythonpath_dev/superset_config.py
index e8223e5358..03b20cc5be 100644
--- a/docker/pythonpath_dev/superset_config.py
+++ b/docker/pythonpath_dev/superset_config.py
@@ -40,6 +40,7 @@ EXAMPLES_PASSWORD = os.getenv("EXAMPLES_PASSWORD")
 EXAMPLES_HOST = os.getenv("EXAMPLES_HOST")
 EXAMPLES_PORT = os.getenv("EXAMPLES_PORT")
 EXAMPLES_DB = os.getenv("EXAMPLES_DB")
+SHOW_STACKTRACE = True
 
 # The SQLAlchemy connection string.
 SQLALCHEMY_DATABASE_URI = (
diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py
index a086f6eff7..b3dd91cb78 100644
--- a/superset/db_engine_specs/base.py
+++ b/superset/db_engine_specs/base.py
@@ -41,7 +41,6 @@ import requests
 import sqlparse
 from apispec import APISpec
 from apispec.ext.marshmallow import MarshmallowPlugin
-from deprecation import deprecated
 from flask import current_app, g, url_for
 from flask_appbuilder.security.sqla.models import User
 from flask_babel import gettext as __, lazy_gettext as _
@@ -1058,19 +1057,6 @@ class BaseEngineSpec:  # pylint: 
disable=too-many-public-methods
             return type_code.upper()
         return None
 
-    @classmethod
-    @deprecated(deprecated_in="3.0")
-    def normalize_indexes(cls, indexes: list[dict[str, Any]]) -> 
list[dict[str, Any]]:
-        """
-        Normalizes indexes for more consistency across db engines
-
-        noop by default
-
-        :param indexes: Raw indexes as returned by SQLAlchemy
-        :return: cleaner, more aligned index definition
-        """
-        return indexes
-
     @classmethod
     def get_table_metadata(
         cls,
@@ -1637,7 +1623,6 @@ class BaseEngineSpec:  # pylint: 
disable=too-many-public-methods
         :return: SqlAlchemy query with additional where clause referencing the 
latest
         partition
         """
-        # TODO: Fix circular import caused by importing Database, TableColumn
         return None
 
     @classmethod
diff --git a/superset/db_engine_specs/bigquery.py 
b/superset/db_engine_specs/bigquery.py
index 70bc4bc845..58f2f1cdb2 100644
--- a/superset/db_engine_specs/bigquery.py
+++ b/superset/db_engine_specs/bigquery.py
@@ -17,20 +17,21 @@
 
 from __future__ import annotations
 
+import logging
 import re
 import urllib
 from datetime import datetime
 from re import Pattern
+from textwrap import dedent
 from typing import Any, TYPE_CHECKING, TypedDict
 
 import pandas as pd
 from apispec import APISpec
 from apispec.ext.marshmallow import MarshmallowPlugin
-from deprecation import deprecated
 from flask_babel import gettext as __
 from marshmallow import fields, Schema
 from marshmallow.exceptions import ValidationError
-from sqlalchemy import column, types
+from sqlalchemy import column, func, types
 from sqlalchemy.engine.base import Engine
 from sqlalchemy.engine.reflection import Inspector
 from sqlalchemy.engine.url import URL
@@ -49,6 +50,11 @@ from superset.superset_typing import ResultSetColumnType
 from superset.utils import core as utils, json
 from superset.utils.hashing import md5_sha_from_str
 
+if TYPE_CHECKING:
+    from sqlalchemy.sql.expression import Select
+
+logger = logging.getLogger(__name__)
+
 try:
     from google.cloud import bigquery
     from google.oauth2 import service_account
@@ -284,42 +290,51 @@ class BigQueryEngineSpec(BaseEngineSpec):  # pylint: 
disable=too-many-public-met
         return "_" + md5_sha_from_str(label)
 
     @classmethod
-    @deprecated(deprecated_in="3.0")
-    def normalize_indexes(cls, indexes: list[dict[str, Any]]) -> 
list[dict[str, Any]]:
-        """
-        Normalizes indexes for more consistency across db engines
+    def where_latest_partition(
+        cls,
+        database: Database,
+        table: Table,
+        query: Select,
+        columns: list[ResultSetColumnType] | None = None,
+    ) -> Select | None:
+        if partition_column := cls.get_time_partition_column(database, table):
+            max_partition_id = cls.get_max_partition_id(database, table)
+            query = query.where(
+                column(partition_column) == func.PARSE_DATE("%Y%m%d", 
max_partition_id)
+            )
 
-        :param indexes: Raw indexes as returned by SQLAlchemy
-        :return: cleaner, more aligned index definition
-        """
-        normalized_idxs = []
-        # Fixing a bug/behavior observed in pybigquery==0.4.15 where
-        # the index's `column_names` == [None]
-        # Here we're returning only non-None indexes
-        for ix in indexes:
-            column_names = ix.get("column_names") or []
-            ix["column_names"] = [col for col in column_names if col is not 
None]
-            if ix["column_names"]:
-                normalized_idxs.append(ix)
-        return normalized_idxs
+        return query
 
     @classmethod
-    def get_indexes(
+    def get_max_partition_id(
         cls,
         database: Database,
-        inspector: Inspector,
         table: Table,
-    ) -> list[dict[str, Any]]:
-        """
-        Get the indexes associated with the specified schema/table.
+    ) -> Select | None:
+        sql = dedent(f"""\
+            SELECT
+                MAX(partition_id) AS max_partition_id
+            FROM `{table.schema}.INFORMATION_SCHEMA.PARTITIONS`
+            WHERE table_name = '{table.table}'
+        """)
+        df = database.get_df(sql)
+        return df.iat[0, 0]
 
-        :param database: The database to inspect
-        :param inspector: The SQLAlchemy inspector
-        :param table: The table instance to inspect
-        :returns: The indexes
-        """
+    @classmethod
+    def get_time_partition_column(
+        cls,
+        database: Database,
+        table: Table,
+    ) -> str | None:
+        with cls.get_engine(
+            database, catalog=table.catalog, schema=table.schema
+        ) as engine:
+            client = cls._get_client(engine, database)
+            bq_table = client.get_table(f"{table.schema}.{table.table}")
 
-        return cls.normalize_indexes(inspector.get_indexes(table.table, 
table.schema))
+            if bq_table.time_partitioning is not None:
+                return bq_table.time_partitioning.field
+        return None
 
     @classmethod
     def get_extra_table_metadata(
@@ -327,23 +342,37 @@ class BigQueryEngineSpec(BaseEngineSpec):  # pylint: 
disable=too-many-public-met
         database: Database,
         table: Table,
     ) -> dict[str, Any]:
-        indexes = database.get_indexes(table)
-        if not indexes:
-            return {}
-        partitions_columns = [
-            index.get("column_names", [])
-            for index in indexes
-            if index.get("name") == "partition"
-        ]
-        cluster_columns = [
-            index.get("column_names", [])
-            for index in indexes
-            if index.get("name") == "clustering"
-        ]
-        return {
-            "partitions": {"cols": partitions_columns},
-            "clustering": {"cols": cluster_columns},
-        }
+        payload = {}
+        partition_column = cls.get_time_partition_column(database, table)
+        with cls.get_engine(
+            database, catalog=table.catalog, schema=table.schema
+        ) as engine:
+            if partition_column:
+                max_partition_id = cls.get_max_partition_id(database, table)
+                payload.update(
+                    {
+                        "partitions": {
+                            "cols": [partition_column],
+                            "latest": {"ds": max_partition_id},
+                        },
+                        "partitionQuery": cls.select_star(
+                            database,
+                            table,
+                            engine,
+                            indent=False,
+                            show_cols=False,
+                            latest_partition=True,
+                        ),
+                        "indexes": [
+                            {
+                                "name": "partitioned",
+                                "cols": [partition_column],
+                                "type": "partitioned",
+                            }
+                        ],
+                    }
+                )
+        return payload
 
     @classmethod
     def epoch_to_dttm(cls) -> str:

Reply via email to