This is an automated email from the ASF dual-hosted git repository. beto pushed a commit to branch chart-telemetry in repository https://gitbox.apache.org/repos/asf/superset.git
commit e65ac7f2f095b3a5b2ad21cf6bc1be3cb2893b83 Author: Beto Dealmeida <[email protected]> AuthorDate: Wed Mar 13 13:26:03 2024 -0400 feat: chart telemetry --- superset/charts/data/api.py | 20 +++--- superset/commands/chart/data/get_data_command.py | 4 +- superset/common/query_context_processor.py | 1 + superset/extensions/telemetry.py | 79 ++++++++++++++++++++++++ superset/models/core.py | 50 ++++++++------- superset/utils/decorators.py | 34 +++++++++- superset/views/base.py | 6 ++ 7 files changed, 162 insertions(+), 32 deletions(-) diff --git a/superset/charts/data/api.py b/superset/charts/data/api.py index 2e46eb2737..91e906a3c7 100644 --- a/superset/charts/data/api.py +++ b/superset/charts/data/api.py @@ -53,7 +53,7 @@ from superset.utils.core import ( get_user_id, json_int_dttm_ser, ) -from superset.utils.decorators import logs_context +from superset.utils.decorators import logs_context, show_telemetry from superset.views.base import CsvResponse, generate_download_headers, XlsxResponse from superset.views.base_api import statsd_metrics @@ -181,6 +181,7 @@ class ChartDataRestApi(ChartRestApi): @expose("/data", methods=("POST",)) @protect() + @show_telemetry @statsd_metrics @event_logger.log_this_with_context( action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.data", @@ -358,7 +359,8 @@ class ChartDataRestApi(ChartRestApi): # This is needed for sending reports based on text charts that do the # post-processing of data, eg, the pivot table. if result_type == ChartDataResultType.POST_PROCESSED: - result = apply_post_process(result, form_data, datasource) + with g.telemetry("Post processing data"): + result = apply_post_process(result, form_data, datasource) if result_format in ChartDataResultFormat.table_like(): # Verify user has permission to export file @@ -396,11 +398,12 @@ class ChartDataRestApi(ChartRestApi): ) if result_format == ChartDataResultFormat.JSON: - response_data = simplejson.dumps( - {"result": result["queries"]}, - default=json_int_dttm_ser, - ignore_nan=True, - ) + with g.telemetry("JSON encoding"): + response_data = simplejson.dumps( + {"result": result["queries"]}, + default=json_int_dttm_ser, + ignore_nan=True, + ) resp = make_response(response_data, 200) resp.headers["Content-Type"] = "application/json; charset=utf-8" return resp @@ -415,7 +418,8 @@ class ChartDataRestApi(ChartRestApi): datasource: BaseDatasource | Query | None = None, ) -> Response: try: - result = command.run(force_cached=force_cached) + with g.telemetry("Running command"): + result = command.run(force_cached=force_cached) except ChartDataCacheLoadError as exc: return self.response_422(message=exc.message) except ChartDataQueryFailedError as exc: diff --git a/superset/commands/chart/data/get_data_command.py b/superset/commands/chart/data/get_data_command.py index 971c343cba..edf1df14a2 100644 --- a/superset/commands/chart/data/get_data_command.py +++ b/superset/commands/chart/data/get_data_command.py @@ -17,6 +17,7 @@ import logging from typing import Any +from flask import g from flask_babel import gettext as _ from superset.commands.base import BaseCommand @@ -43,7 +44,8 @@ class ChartDataCommand(BaseCommand): force_cached = kwargs.get("force_cached", False) try: payload = self._query_context.get_payload( - cache_query_context=cache_query_context, force_cached=force_cached + cache_query_context=cache_query_context, + force_cached=force_cached, ) except CacheLoadError as ex: raise ChartDataCacheLoadError(ex.message) from ex diff --git a/superset/common/query_context_processor.py b/superset/common/query_context_processor.py index d8b5bea4bb..6ed3df29b1 100644 --- a/superset/common/query_context_processor.py +++ b/superset/common/query_context_processor.py @@ -23,6 +23,7 @@ from typing import Any, ClassVar, TYPE_CHECKING, TypedDict import numpy as np import pandas as pd +from flask import g from flask_babel import gettext as _ from pandas import DateOffset diff --git a/superset/extensions/telemetry.py b/superset/extensions/telemetry.py new file mode 100644 index 0000000000..0eb5d78370 --- /dev/null +++ b/superset/extensions/telemetry.py @@ -0,0 +1,79 @@ +from __future__ import annotations + +import time +from collections.abc import Iterator +from contextlib import contextmanager +from typing import TypedDict + + +class TelemetryItem(TypedDict): + name: str + start: float + end: float | None + children: list[TelemetryItem] + + +class TelemetryHandler: + """ + Handler for telemetry events. + + To use this, decorate an endpoint with `@show_telemetry`: + + @expose("/") + @show_telemetry + def some_endpoint() -> str: + with g.telemetry("Computation"): + output = {"answer": some_computation()} + + return jsonify(output) + + def some_computation() -> int: + with g.telemetry("Crunching numbers"): + return magic() + + The response payload will then look like this: + + { + # original response + "answer": 42, + + # added telemetry + "telemetry": [ + { + "name": "Computation", + "start": 1710360466.328792, + "end": 1710360472.7976031, + "children": [ + { + "name": "Crunching numbers", + "start": 1710360468.401769, + "end": 1710360470.532115, + "children": [], + }, + ], + }, + }, + } + + """ + + def __init__(self) -> None: + self.events: list[TelemetryItem] = [] + self.root = self.events + + @contextmanager + def __call__(self, name: str) -> Iterator[None]: + event: TelemetryItem = { + "name": name, + "start": time.time(), + "end": None, + "children": [], + } + self.root.append(event) + previous = self.root + self.root = event["children"] + try: + yield + finally: + event["end"] = time.time() + self.root = previous diff --git a/superset/models/core.py b/superset/models/core.py index 71a6e9d042..d1019351ec 100755 --- a/superset/models/core.py +++ b/superset/models/core.py @@ -579,35 +579,41 @@ class Database( ) with self.get_raw_connection(schema=schema) as conn: - cursor = conn.cursor() - for sql_ in sqls[:-1]: + with g.telemetry("Executing query"): + cursor = conn.cursor() + for sql_ in sqls[:-1]: + if mutate_after_split: + sql_ = sql_query_mutator( + sql_, + security_manager=security_manager, + database=None, + ) + _log_query(sql_) + self.db_engine_spec.execute(cursor, sql_) + cursor.fetchall() + if mutate_after_split: - sql_ = sql_query_mutator( - sql_, + last_sql = sql_query_mutator( + sqls[-1], security_manager=security_manager, database=None, ) - _log_query(sql_) - self.db_engine_spec.execute(cursor, sql_) - cursor.fetchall() - - if mutate_after_split: - last_sql = sql_query_mutator( - sqls[-1], - security_manager=security_manager, - database=None, - ) - _log_query(last_sql) - self.db_engine_spec.execute(cursor, last_sql) - else: - _log_query(sqls[-1]) - self.db_engine_spec.execute(cursor, sqls[-1]) + _log_query(last_sql) + self.db_engine_spec.execute(cursor, last_sql) + else: + _log_query(sqls[-1]) + self.db_engine_spec.execute(cursor, sqls[-1]) + + with g.telemetry("Fetching data from cursor"): + data = self.db_engine_spec.fetch_data(cursor) - data = self.db_engine_spec.fetch_data(cursor) result_set = SupersetResultSet( - data, cursor.description, self.db_engine_spec + data, + cursor.description, + self.db_engine_spec, ) - df = result_set.to_pandas_df() + with g.telemetry("Loding into dataframe"): + df = result_set.to_pandas_df() if mutator: df = mutator(df) diff --git a/superset/utils/decorators.py b/superset/utils/decorators.py index 7e34b98360..be29f9a5e4 100644 --- a/superset/utils/decorators.py +++ b/superset/utils/decorators.py @@ -20,10 +20,11 @@ import logging import time from collections.abc import Iterator from contextlib import contextmanager +from functools import wraps from typing import Any, Callable, TYPE_CHECKING from uuid import UUID -from flask import current_app, g, Response +from flask import current_app, g, jsonify, Response from superset.utils import core as utils from superset.utils.dates import now_as_float @@ -210,3 +211,34 @@ def suppress_logging( yield finally: target_logger.setLevel(original_level) + + +def show_telemetry(f: Callable[..., Any]) -> Callable[..., Any]: + """ + For JSON responses, add telemetry information to the payload. + + This allows us to instrument the stack, but adding timestamps at different levels, + eg: + + with g.telemetry("Run query"): + data = run_query(sql) + + And then we can display this information in the UI. + """ + + @wraps(f) + def wrapped(*args: Any, **kwargs: Any) -> Any: + result = f(*args, **kwargs) + if hasattr(result, "get_json"): + try: + json_data = result.get_json() + except Exception: # pylint: disable=broad-exception-caught + return result + + if isinstance(json_data, dict) and hasattr(g, "telemetry"): + json_data["telemetry"] = g.telemetry.events + return jsonify(json_data) + + return result + + return wrapped diff --git a/superset/views/base.py b/superset/views/base.py index c8b4862710..1402953492 100644 --- a/superset/views/base.py +++ b/superset/views/base.py @@ -74,6 +74,7 @@ from superset.exceptions import ( SupersetSecurityException, ) from superset.extensions import cache_manager +from superset.extensions.telemetry import TelemetryHandler from superset.models.helpers import ImportExportMixin from superset.reports.models import ReportRecipientType from superset.superset_typing import FlaskResponse @@ -725,3 +726,8 @@ def apply_http_headers(response: Response) -> Response: if k not in response.headers: response.headers[k] = v return response + + +@superset_app.before_request +def start_telemetry() -> None: + g.telemetry = TelemetryHandler()
