This is an automated email from the ASF dual-hosted git repository.
beto pushed a commit to branch explorable
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/explorable by this push:
new debcde2057 Rearrange mapper
debcde2057 is described below
commit debcde20576f58678105630ddd618ea59248ca29
Author: Beto Dealmeida <[email protected]>
AuthorDate: Wed Oct 29 15:41:14 2025 -0400
Rearrange mapper
---
superset/semantic_layers/mapper.py | 289 ++++++++++++++++++-------------------
1 file changed, 143 insertions(+), 146 deletions(-)
diff --git a/superset/semantic_layers/mapper.py
b/superset/semantic_layers/mapper.py
index 413508a9ce..9da28ecef0 100644
--- a/superset/semantic_layers/mapper.py
+++ b/superset/semantic_layers/mapper.py
@@ -15,6 +15,14 @@
# specific language governing permissions and limitations
# under the License.
+"""
+Functions for mapping `QueryObject` to semantic layers.
+
+These functions validate and convert a `QueryObject` into one or more
`SemanticQuery`,
+which are then passed to semantic layer implementations for execution.
+
+"""
+
from datetime import datetime
import numpy as np
@@ -36,22 +44,133 @@ from superset.semantic_layers.types import (
SemanticQuery,
SemanticResult,
SemanticViewFeature,
- SemanticViewImplementation,
TimeGrain,
)
from superset.utils.core import FilterOperator, TIME_COMPARISON
from superset.utils.date_parser import get_past_or_future
+def get_results(query_object: QueryObject) -> SemanticResult:
+ """
+ Run 1+ queries based on `QueryObject` and return the results.
+
+ :param query_object: The QueryObject containing query specifications
+ :return: SemanticResult with combined DataFrame and all requests
+ """
+ validate_query_object(query_object)
+
+ semantic_view = query_object.datasource.implementation
+ dispatcher = (
+ semantic_view.get_row_count
+ if query_object.is_rowcount
+ else semantic_view.get_dataframe
+ )
+
+ # Step 1: Convert QueryObject to list of SemanticQuery objects
+ # The first query is the main query, subsequent queries are for time
offsets
+ queries = map_query_object(query_object)
+
+ # Step 2: Execute the main query (first in the list)
+ main_query = queries[0]
+ main_result = dispatcher(
+ metrics=main_query.metrics,
+ dimensions=main_query.dimensions,
+ filters=main_query.filters,
+ order=main_query.order,
+ limit=main_query.limit,
+ offset=main_query.offset,
+ group_limit=main_query.group_limit,
+ )
+
+ main_df = main_result.results
+
+ # Collect all requests (SQL queries, HTTP requests, etc.) for
troubleshooting
+ all_requests = list(main_result.requests)
+
+ # If no time offsets, return the main result as-is
+ if not query_object.time_offsets or len(queries) <= 1:
+ return SemanticResult(
+ requests=all_requests,
+ results=main_df,
+ )
+
+ # Get metric names from the main query
+ # These are the columns that will be renamed with offset suffixes
+ metric_names = [metric.name for metric in main_query.metrics]
+
+ # Join keys are all columns except metrics
+ # These will be used to match rows between main and offset DataFrames
+ join_keys = [col for col in main_df.columns if col not in metric_names]
+
+ # Step 3 & 4: Execute each time offset query and join results
+ for offset_query, time_offset in zip(
+ queries[1:],
+ query_object.time_offsets,
+ strict=False,
+ ):
+ # Execute the offset query
+ result = dispatcher(
+ metrics=offset_query.metrics,
+ dimensions=offset_query.dimensions,
+ filters=offset_query.filters,
+ order=offset_query.order,
+ limit=offset_query.limit,
+ offset=offset_query.offset,
+ group_limit=offset_query.group_limit,
+ )
+
+ # Add this query's requests to the collection
+ all_requests.extend(result.requests)
+
+ offset_df = result.results
+
+ # Handle empty results - add NaN columns directly instead of merging
+ # This avoids dtype mismatch issues with empty DataFrames
+ if offset_df.empty:
+ # Add offset metric columns with NaN values directly to main_df
+ for metric in metric_names:
+ offset_col_name = TIME_COMPARISON.join([metric, time_offset])
+ main_df[offset_col_name] = np.nan
+ else:
+ # Rename metric columns with time offset suffix
+ # Format: "{metric_name}__{time_offset}"
+ # Example: "revenue" -> "revenue__1 week ago"
+ offset_df = offset_df.rename(
+ columns={
+ metric: TIME_COMPARISON.join([metric, time_offset])
+ for metric in metric_names
+ }
+ )
+
+ # Step 5: Perform left join on dimension columns
+ # This preserves all rows from main_df and adds offset metrics
+ # where they match
+ main_df = main_df.merge(
+ offset_df,
+ on=join_keys,
+ how="left",
+ suffixes=("", "__duplicate"),
+ )
+
+ # Clean up any duplicate columns that might have been created
+ # (shouldn't happen with proper join keys, but defensive
programming)
+ duplicate_cols = [
+ col for col in main_df.columns if col.endswith("__duplicate")
+ ]
+ if duplicate_cols:
+ main_df = main_df.drop(columns=duplicate_cols)
+
+ return SemanticResult(requests=all_requests, results=main_df)
+
+
def map_query_object(query_object: QueryObject) -> list[SemanticQuery]:
"""
Convert a `QueryObject` into a list of `SemanticQuery`.
- This function maps the `QueryObject` into query objects that are less
centered on
- visualization, simplifying the process of adding new semantic layers to
Superset.
+ This function maps the `QueryObject` into query objects that focus less on
+ visualization and more on semantics.
"""
semantic_view = query_object.datasource.implementation
- validate_query_object(query_object, semantic_view)
all_metrics = {metric.name: metric for metric in semantic_view.metrics}
all_dimensions = {
@@ -477,30 +596,26 @@ def _convert_time_grain(time_grain: str) -> TimeGrain |
DateGrain | None:
return None
-def validate_query_object(
- query_object: QueryObject,
- semantic_view: SemanticViewImplementation,
-) -> None:
+def validate_query_object(query_object: QueryObject) -> None:
"""
Validate that the `QueryObject` is compatible with the `SemanticView`.
If some semantic view implementation supports these features we should add
an
attribute to the `SemanticViewImplementation` to indicate support for them.
"""
- _validate_metrics(query_object, semantic_view)
- _validate_dimensions(query_object, semantic_view)
- _validate_granularity(query_object, semantic_view)
- _validate_group_limit(query_object, semantic_view)
- _validate_orderby(query_object, semantic_view)
+ _validate_metrics(query_object)
+ _validate_dimensions(query_object)
+ _validate_granularity(query_object)
+ _validate_group_limit(query_object)
+ _validate_orderby(query_object)
-def _validate_metrics(
- query_object: QueryObject,
- semantic_view: SemanticViewImplementation,
-) -> None:
+def _validate_metrics(query_object: QueryObject) -> None:
"""
Make sure metrics are defined in the semantic view.
"""
+ semantic_view = query_object.datasource.implementation
+
if any(not isinstance(metric, str) for metric in query_object.metrics):
raise ValueError("Adhoc metrics are not supported in Semantic Views.")
@@ -509,13 +624,12 @@ def _validate_metrics(
raise ValueError("All metrics must be defined in the Semantic View.")
-def _validate_dimensions(
- query_object: QueryObject,
- semantic_view: SemanticViewImplementation,
-) -> None:
+def _validate_dimensions(query_object: QueryObject) -> None:
"""
Make sure all dimensions are defined in the semantic view.
"""
+ semantic_view = query_object.datasource.implementation
+
if any(not isinstance(column, str) for column in query_object.columns):
raise ValueError("Adhoc dimensions are not supported in Semantic
Views.")
@@ -524,13 +638,11 @@ def _validate_dimensions(
raise ValueError("All dimensions must be defined in the Semantic
View.")
-def _validate_granularity(
- query_object: QueryObject,
- semantic_view: SemanticViewImplementation,
-) -> None:
+def _validate_granularity(query_object: QueryObject) -> None:
"""
Make sure time column and time grain are valid.
"""
+ semantic_view = query_object.datasource.implementation
dimension_names = {dimension.name for dimension in
semantic_view.dimensions}
if time_column := query_object.granularity:
@@ -557,13 +669,12 @@ def _validate_granularity(
)
-def _validate_group_limit(
- query_object: QueryObject,
- semantic_view: SemanticViewImplementation,
-) -> None:
+def _validate_group_limit(query_object: QueryObject) -> None:
"""
Validate group limit related features in the query object.
"""
+ semantic_view = query_object.datasource.implementation
+
if (
query_object.series_columns
and SemanticViewFeature.GROUP_LIMIT not in semantic_view.features
@@ -584,13 +695,12 @@ def _validate_group_limit(
)
-def _validate_orderby(
- query_object: QueryObject,
- semantic_view: SemanticViewImplementation,
-) -> None:
+def _validate_orderby(query_object: QueryObject) -> None:
"""
Validate order by elements in the query object.
"""
+ semantic_view = query_object.datasource.implementation
+
if (
any(not isinstance(element, str) for element, _ in
query_object.orderby)
and SemanticViewFeature.ADHOC_EXPRESSIONS_IN_ORDERBY
@@ -607,116 +717,3 @@ def _validate_orderby(
dimension_names = {dimension.name for dimension in
semantic_view.dimensions}
if not elements <= metric_names | dimension_names:
raise ValueError("All order by elements must be defined in the
Semantic View.")
-
-
-def get_results(query_object: QueryObject) -> SemanticResult:
- """
- Run a query based on the `QueryObject` and return the results as a
SemanticResult.
-
- :param query_object: The QueryObject containing query specifications
- :return: SemanticResult with combined DataFrame and all requests
- """
- semantic_view = query_object.datasource.implementation
- validate_query_object(query_object, semantic_view)
-
- dispatcher = (
- semantic_view.get_row_count
- if query_object.is_rowcount
- else semantic_view.get_dataframe
- )
-
- # Step 1: Convert QueryObject to list of SemanticQuery objects
- # The first query is the main query, subsequent queries are for time
offsets
- queries = map_query_object(query_object)
-
- # Step 2: Execute the main query (first in the list)
- main_query = queries[0]
- main_result = dispatcher(
- metrics=main_query.metrics,
- dimensions=main_query.dimensions,
- filters=main_query.filters,
- order=main_query.order,
- limit=main_query.limit,
- offset=main_query.offset,
- group_limit=main_query.group_limit,
- )
-
- main_df = main_result.results
-
- # Collect all requests (SQL queries, HTTP requests, etc.) for
troubleshooting
- all_requests = list(main_result.requests)
-
- # If no time offsets, return the main result as-is
- if not query_object.time_offsets or len(queries) <= 1:
- return SemanticResult(
- requests=all_requests,
- results=main_df,
- )
-
- # Get metric names from the main query
- # These are the columns that will be renamed with offset suffixes
- metric_names = [metric.name for metric in main_query.metrics]
-
- # Join keys are all columns except metrics
- # These will be used to match rows between main and offset DataFrames
- join_keys = [col for col in main_df.columns if col not in metric_names]
-
- # Step 3 & 4: Execute each time offset query and join results
- for offset_query, time_offset in zip(
- queries[1:],
- query_object.time_offsets,
- strict=False,
- ):
- # Execute the offset query
- result = dispatcher(
- metrics=offset_query.metrics,
- dimensions=offset_query.dimensions,
- filters=offset_query.filters,
- order=offset_query.order,
- limit=offset_query.limit,
- offset=offset_query.offset,
- group_limit=offset_query.group_limit,
- )
-
- # Add this query's requests to the collection
- all_requests.extend(result.requests)
-
- offset_df = result.results
-
- # Handle empty results - add NaN columns directly instead of merging
- # This avoids dtype mismatch issues with empty DataFrames
- if offset_df.empty:
- # Add offset metric columns with NaN values directly to main_df
- for metric in metric_names:
- offset_col_name = TIME_COMPARISON.join([metric, time_offset])
- main_df[offset_col_name] = np.nan
- else:
- # Rename metric columns with time offset suffix
- # Format: "{metric_name}__{time_offset}"
- # Example: "revenue" -> "revenue__1 week ago"
- offset_df = offset_df.rename(
- columns={
- metric: TIME_COMPARISON.join([metric, time_offset])
- for metric in metric_names
- }
- )
-
- # Step 5: Perform left join on dimension columns
- # This preserves all rows from main_df and adds offset metrics
- # where they match
- main_df = main_df.merge(
- offset_df,
- on=join_keys,
- how="left",
- suffixes=("", "__duplicate"),
- )
-
- # Clean up any duplicate columns that might have been created
- # (shouldn't happen with proper join keys, but defensive
programming)
- duplicate_cols = [
- col for col in main_df.columns if col.endswith("__duplicate")
- ]
- if duplicate_cols:
- main_df = main_df.drop(columns=duplicate_cols)
-
- return SemanticResult(requests=all_requests, results=main_df)