This is an automated email from the ASF dual-hosted git repository. beto pushed a commit to branch chart-telemetry in repository https://gitbox.apache.org/repos/asf/superset.git
commit 7c47b51928677908566a726fa2c696618c258e32 Author: Beto Dealmeida <[email protected]> AuthorDate: Wed Mar 13 13:26:03 2024 -0400 feat: chart telemetry --- superset/charts/data/api.py | 22 ++++++---- superset/commands/chart/data/get_data_command.py | 1 + superset/extensions/telemetry.py | 56 ++++++++++++++++++++++++ superset/utils/decorators.py | 35 ++++++++++++++- superset/views/base.py | 6 +++ 5 files changed, 111 insertions(+), 9 deletions(-) diff --git a/superset/charts/data/api.py b/superset/charts/data/api.py index 2e46eb2737..a7a021b97a 100644 --- a/superset/charts/data/api.py +++ b/superset/charts/data/api.py @@ -53,7 +53,7 @@ from superset.utils.core import ( get_user_id, json_int_dttm_ser, ) -from superset.utils.decorators import logs_context +from superset.utils.decorators import logs_context, show_telemetry from superset.views.base import CsvResponse, generate_download_headers, XlsxResponse from superset.views.base_api import statsd_metrics @@ -181,6 +181,7 @@ class ChartDataRestApi(ChartRestApi): @expose("/data", methods=("POST",)) @protect() + @show_telemetry @statsd_metrics @event_logger.log_this_with_context( action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.data", @@ -225,6 +226,8 @@ class ChartDataRestApi(ChartRestApi): 500: $ref: '#/components/responses/500' """ + g.telemetry.add("Computing chart data") + json_body = None if request.is_json: json_body = request.json @@ -358,7 +361,8 @@ class ChartDataRestApi(ChartRestApi): # This is needed for sending reports based on text charts that do the # post-processing of data, eg, the pivot table. if result_type == ChartDataResultType.POST_PROCESSED: - result = apply_post_process(result, form_data, datasource) + with g.telemetry("Post processing data"): + result = apply_post_process(result, form_data, datasource) if result_format in ChartDataResultFormat.table_like(): # Verify user has permission to export file @@ -396,11 +400,12 @@ class ChartDataRestApi(ChartRestApi): ) if result_format == ChartDataResultFormat.JSON: - response_data = simplejson.dumps( - {"result": result["queries"]}, - default=json_int_dttm_ser, - ignore_nan=True, - ) + with g.telemetry("JSON encoding"): + response_data = simplejson.dumps( + {"result": result["queries"]}, + default=json_int_dttm_ser, + ignore_nan=True, + ) resp = make_response(response_data, 200) resp.headers["Content-Type"] = "application/json; charset=utf-8" return resp @@ -415,7 +420,8 @@ class ChartDataRestApi(ChartRestApi): datasource: BaseDatasource | Query | None = None, ) -> Response: try: - result = command.run(force_cached=force_cached) + with g.telemetry("Running command"): + result = command.run(force_cached=force_cached) except ChartDataCacheLoadError as exc: return self.response_422(message=exc.message) except ChartDataQueryFailedError as exc: diff --git a/superset/commands/chart/data/get_data_command.py b/superset/commands/chart/data/get_data_command.py index 971c343cba..79fc83941c 100644 --- a/superset/commands/chart/data/get_data_command.py +++ b/superset/commands/chart/data/get_data_command.py @@ -17,6 +17,7 @@ import logging from typing import Any +from flask import g from flask_babel import gettext as _ from superset.commands.base import BaseCommand diff --git a/superset/extensions/telemetry.py b/superset/extensions/telemetry.py new file mode 100644 index 0000000000..b804776db0 --- /dev/null +++ b/superset/extensions/telemetry.py @@ -0,0 +1,56 @@ +import time +from collections.abc import Iterator +from contextlib import contextmanager + + +class TelemetryHandler: + """ + Handler for telemetry events. + + To use this, decorate an endpoint with `@show_telemetry`: + + @expose("/") + @show_telemetry + def some_endpoint() -> str: + g.telemetry.add("STARTED_COMPUTATION") + output = {"answer": some_computation()} + g.telemetry.add("ENDED_COMPUTATION") + return jsonify(output) + + The response payload will then look like this: + + { + "answer": 42, + "telemetry": { + 1710345893.4794712: 'STARTED_COMPUTATION', + 1710345900.3592598: 'ENDED_COMPUTATION', + }, + } + + """ + + def __init__(self) -> None: + self.events: list[tuple[str, float]] = [] + + @contextmanager + def __call__(self, event: str) -> Iterator[None]: + """ + Context manager for start/end events. + + with g.telemetry("Run query"): + run_query() + + Will produce the events "Run query START" and "Run query END". + """ + self.add(f"{event} START") + try: + yield + self.add(f"{event} END") + except Exception: # pylint: disable=broad-exception-caught + self.add(f"{event} FAILED") + + def add(self, event: str) -> None: + self.events.append((event, time.time())) + + def to_dict(self) -> dict[float, str]: + return {event_time: event_name for event_name, event_time in self.events} # diff --git a/superset/utils/decorators.py b/superset/utils/decorators.py index 7e34b98360..fdc764ba36 100644 --- a/superset/utils/decorators.py +++ b/superset/utils/decorators.py @@ -20,10 +20,11 @@ import logging import time from collections.abc import Iterator from contextlib import contextmanager +from functools import wraps from typing import Any, Callable, TYPE_CHECKING from uuid import UUID -from flask import current_app, g, Response +from flask import current_app, g, jsonify, Response from superset.utils import core as utils from superset.utils.dates import now_as_float @@ -210,3 +211,35 @@ def suppress_logging( yield finally: target_logger.setLevel(original_level) + + +def show_telemetry(f: Callable[..., Any]) -> Callable[..., Any]: + """ + For JSON responses, add telemetry information to the payload. + + This allows us to instrument the stack, but adding timestamps at different levels, + eg: + + g.telemetry.add("START_RUN_QUERY") + data = run_query(sql) + g.telemetry.add("END_RUN_QUERY") + + And then we can display this information in the UI. + """ + + @wraps(f) + def wrapped(*args: Any, **kwargs: Any) -> Any: + result = f(*args, **kwargs) + if hasattr(result, "get_json"): + try: + json_data = result.get_json() + except Exception: # pylint: disable=broad-exception-caught + return result + + if isinstance(json_data, dict) and hasattr(g, "telemetry"): + json_data["telemetry"] = g.telemetry.to_dict() + return jsonify(json_data) + + return result + + return wrapped diff --git a/superset/views/base.py b/superset/views/base.py index c8b4862710..1402953492 100644 --- a/superset/views/base.py +++ b/superset/views/base.py @@ -74,6 +74,7 @@ from superset.exceptions import ( SupersetSecurityException, ) from superset.extensions import cache_manager +from superset.extensions.telemetry import TelemetryHandler from superset.models.helpers import ImportExportMixin from superset.reports.models import ReportRecipientType from superset.superset_typing import FlaskResponse @@ -725,3 +726,8 @@ def apply_http_headers(response: Response) -> Response: if k not in response.headers: response.headers[k] = v return response + + +@superset_app.before_request +def start_telemetry() -> None: + g.telemetry = TelemetryHandler()
