This is an automated email from the ASF dual-hosted git repository. maximebeauchemin pushed a commit to branch bigquery_partitions in repository https://gitbox.apache.org/repos/asf/superset.git
commit 4824dbb1e87d790483b4a8d0458364ec23aee853 Author: Maxime Beauchemin <[email protected]> AuthorDate: Wed Oct 30 19:30:10 2024 -0700 feat(sqllab): add latest partition support for BigQuery Adding db_engine_spec-related features that enables SQL Lab to show the latest partition when using time partitioning in BigQuery as well as applying a WHERE clause on the latest partition by default when fetching the sample dataset. Turns out that `SELECT * FROM {{...}} LIMIT {n}` can be costly against large tables in BigQuery as it results in a full table scan. --- docker/pythonpath_dev/superset_config.py | 1 + superset/db_engine_specs/base.py | 15 ---- superset/db_engine_specs/bigquery.py | 123 +++++++++++++++++++------------ 3 files changed, 77 insertions(+), 62 deletions(-) diff --git a/docker/pythonpath_dev/superset_config.py b/docker/pythonpath_dev/superset_config.py index e8223e5358..03b20cc5be 100644 --- a/docker/pythonpath_dev/superset_config.py +++ b/docker/pythonpath_dev/superset_config.py @@ -40,6 +40,7 @@ EXAMPLES_PASSWORD = os.getenv("EXAMPLES_PASSWORD") EXAMPLES_HOST = os.getenv("EXAMPLES_HOST") EXAMPLES_PORT = os.getenv("EXAMPLES_PORT") EXAMPLES_DB = os.getenv("EXAMPLES_DB") +SHOW_STACKTRACE = True # The SQLAlchemy connection string. SQLALCHEMY_DATABASE_URI = ( diff --git a/superset/db_engine_specs/base.py b/superset/db_engine_specs/base.py index a086f6eff7..b3dd91cb78 100644 --- a/superset/db_engine_specs/base.py +++ b/superset/db_engine_specs/base.py @@ -41,7 +41,6 @@ import requests import sqlparse from apispec import APISpec from apispec.ext.marshmallow import MarshmallowPlugin -from deprecation import deprecated from flask import current_app, g, url_for from flask_appbuilder.security.sqla.models import User from flask_babel import gettext as __, lazy_gettext as _ @@ -1058,19 +1057,6 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods return type_code.upper() return None - @classmethod - @deprecated(deprecated_in="3.0") - def normalize_indexes(cls, indexes: list[dict[str, Any]]) -> list[dict[str, Any]]: - """ - Normalizes indexes for more consistency across db engines - - noop by default - - :param indexes: Raw indexes as returned by SQLAlchemy - :return: cleaner, more aligned index definition - """ - return indexes - @classmethod def get_table_metadata( cls, @@ -1637,7 +1623,6 @@ class BaseEngineSpec: # pylint: disable=too-many-public-methods :return: SqlAlchemy query with additional where clause referencing the latest partition """ - # TODO: Fix circular import caused by importing Database, TableColumn return None @classmethod diff --git a/superset/db_engine_specs/bigquery.py b/superset/db_engine_specs/bigquery.py index 70bc4bc845..58f2f1cdb2 100644 --- a/superset/db_engine_specs/bigquery.py +++ b/superset/db_engine_specs/bigquery.py @@ -17,20 +17,21 @@ from __future__ import annotations +import logging import re import urllib from datetime import datetime from re import Pattern +from textwrap import dedent from typing import Any, TYPE_CHECKING, TypedDict import pandas as pd from apispec import APISpec from apispec.ext.marshmallow import MarshmallowPlugin -from deprecation import deprecated from flask_babel import gettext as __ from marshmallow import fields, Schema from marshmallow.exceptions import ValidationError -from sqlalchemy import column, types +from sqlalchemy import column, func, types from sqlalchemy.engine.base import Engine from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.url import URL @@ -49,6 +50,11 @@ from superset.superset_typing import ResultSetColumnType from superset.utils import core as utils, json from superset.utils.hashing import md5_sha_from_str +if TYPE_CHECKING: + from sqlalchemy.sql.expression import Select + +logger = logging.getLogger(__name__) + try: from google.cloud import bigquery from google.oauth2 import service_account @@ -284,42 +290,51 @@ class BigQueryEngineSpec(BaseEngineSpec): # pylint: disable=too-many-public-met return "_" + md5_sha_from_str(label) @classmethod - @deprecated(deprecated_in="3.0") - def normalize_indexes(cls, indexes: list[dict[str, Any]]) -> list[dict[str, Any]]: - """ - Normalizes indexes for more consistency across db engines + def where_latest_partition( + cls, + database: Database, + table: Table, + query: Select, + columns: list[ResultSetColumnType] | None = None, + ) -> Select | None: + if partition_column := cls.get_time_partition_column(database, table): + max_partition_id = cls.get_max_partition_id(database, table) + query = query.where( + column(partition_column) == func.PARSE_DATE("%Y%m%d", max_partition_id) + ) - :param indexes: Raw indexes as returned by SQLAlchemy - :return: cleaner, more aligned index definition - """ - normalized_idxs = [] - # Fixing a bug/behavior observed in pybigquery==0.4.15 where - # the index's `column_names` == [None] - # Here we're returning only non-None indexes - for ix in indexes: - column_names = ix.get("column_names") or [] - ix["column_names"] = [col for col in column_names if col is not None] - if ix["column_names"]: - normalized_idxs.append(ix) - return normalized_idxs + return query @classmethod - def get_indexes( + def get_max_partition_id( cls, database: Database, - inspector: Inspector, table: Table, - ) -> list[dict[str, Any]]: - """ - Get the indexes associated with the specified schema/table. + ) -> Select | None: + sql = dedent(f"""\ + SELECT + MAX(partition_id) AS max_partition_id + FROM `{table.schema}.INFORMATION_SCHEMA.PARTITIONS` + WHERE table_name = '{table.table}' + """) + df = database.get_df(sql) + return df.iat[0, 0] - :param database: The database to inspect - :param inspector: The SQLAlchemy inspector - :param table: The table instance to inspect - :returns: The indexes - """ + @classmethod + def get_time_partition_column( + cls, + database: Database, + table: Table, + ) -> str | None: + with cls.get_engine( + database, catalog=table.catalog, schema=table.schema + ) as engine: + client = cls._get_client(engine, database) + bq_table = client.get_table(f"{table.schema}.{table.table}") - return cls.normalize_indexes(inspector.get_indexes(table.table, table.schema)) + if bq_table.time_partitioning is not None: + return bq_table.time_partitioning.field + return None @classmethod def get_extra_table_metadata( @@ -327,23 +342,37 @@ class BigQueryEngineSpec(BaseEngineSpec): # pylint: disable=too-many-public-met database: Database, table: Table, ) -> dict[str, Any]: - indexes = database.get_indexes(table) - if not indexes: - return {} - partitions_columns = [ - index.get("column_names", []) - for index in indexes - if index.get("name") == "partition" - ] - cluster_columns = [ - index.get("column_names", []) - for index in indexes - if index.get("name") == "clustering" - ] - return { - "partitions": {"cols": partitions_columns}, - "clustering": {"cols": cluster_columns}, - } + payload = {} + partition_column = cls.get_time_partition_column(database, table) + with cls.get_engine( + database, catalog=table.catalog, schema=table.schema + ) as engine: + if partition_column: + max_partition_id = cls.get_max_partition_id(database, table) + payload.update( + { + "partitions": { + "cols": [partition_column], + "latest": {"ds": max_partition_id}, + }, + "partitionQuery": cls.select_star( + database, + table, + engine, + indent=False, + show_cols=False, + latest_partition=True, + ), + "indexes": [ + { + "name": "partitioned", + "cols": [partition_column], + "type": "partitioned", + } + ], + } + ) + return payload @classmethod def epoch_to_dttm(cls) -> str:
