This is an automated email from the ASF dual-hosted git repository. vavila pushed a commit to branch feat/mutate-alert-query in repository https://gitbox.apache.org/repos/asf/superset.git
commit 42d8c2753bc353e68d32ca52f0286ea6cb607409 Author: Vitor Avila <[email protected]> AuthorDate: Tue Jan 14 15:03:07 2025 -0300 feat: Mutate SQL query executed by alerts --- superset/commands/report/alert.py | 19 +++- superset/commands/report/execute.py | 4 +- superset/config.py | 4 + tests/integration_tests/reports/alert_tests.py | 126 +++++++++++++++++++++++-- 4 files changed, 144 insertions(+), 9 deletions(-) diff --git a/superset/commands/report/alert.py b/superset/commands/report/alert.py index ea45853b29..d713c45811 100644 --- a/superset/commands/report/alert.py +++ b/superset/commands/report/alert.py @@ -20,6 +20,7 @@ import logging from operator import eq, ge, gt, le, lt, ne from timeit import default_timer from typing import Any +from uuid import UUID import numpy as np import pandas as pd @@ -40,6 +41,7 @@ from superset.reports.models import ReportSchedule, ReportScheduleValidatorType from superset.tasks.utils import get_executor from superset.utils import json from superset.utils.core import override_user +from superset.utils.decorators import logs_context from superset.utils.retries import retry_call logger = logging.getLogger(__name__) @@ -52,8 +54,9 @@ OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, "==": eq, "!=": ne} class AlertCommand(BaseCommand): - def __init__(self, report_schedule: ReportSchedule): + def __init__(self, report_schedule: ReportSchedule, execution_id: UUID): self._report_schedule = report_schedule + self._execution_id = execution_id self._result: float | None = None def run(self) -> bool: @@ -135,6 +138,13 @@ class AlertCommand(BaseCommand): self._report_schedule.validator_type == ReportScheduleValidatorType.OPERATOR ) + def _get_alert_metadata_from_object(self) -> dict[str, Any]: + return { + "report_schedule_id": self._report_schedule.id, + "execution_id": self._execution_id, + } + + @logs_context(context_func=_get_alert_metadata_from_object) def _execute_query(self) -> pd.DataFrame: """ Executes the actual alert SQL query template @@ -152,6 +162,13 @@ class AlertCommand(BaseCommand): rendered_sql, ALERT_SQL_LIMIT ) + if app.config["MUTATE_ALERT_QUERY"]: + limited_rendered_sql = ( + self._report_schedule.database.mutate_sql_based_on_config( + limited_rendered_sql + ) + ) + executor, username = get_executor( # pylint: disable=unused-variable executor_types=app.config["ALERT_REPORTS_EXECUTE_AS"], model=self._report_schedule, diff --git a/superset/commands/report/execute.py b/superset/commands/report/execute.py index 9293e967aa..54a2890a96 100644 --- a/superset/commands/report/execute.py +++ b/superset/commands/report/execute.py @@ -698,7 +698,7 @@ class ReportNotTriggeredErrorState(BaseReportState): try: # If it's an alert check if the alert is triggered if self._report_schedule.type == ReportScheduleType.ALERT: - if not AlertCommand(self._report_schedule).run(): + if not AlertCommand(self._report_schedule, self._execution_id).run(): self.update_report_schedule_and_log(ReportState.NOOP) return self.send() @@ -782,7 +782,7 @@ class ReportSuccessState(BaseReportState): return self.update_report_schedule_and_log(ReportState.WORKING) try: - if not AlertCommand(self._report_schedule).run(): + if not AlertCommand(self._report_schedule, self._execution_id).run(): self.update_report_schedule_and_log(ReportState.NOOP) return except Exception as ex: diff --git a/superset/config.py b/superset/config.py index f36ffade3c..f99751eeb2 100644 --- a/superset/config.py +++ b/superset/config.py @@ -1380,6 +1380,10 @@ def SQL_QUERY_MUTATOR( # pylint: disable=invalid-name,unused-argument # noqa: MUTATE_AFTER_SPLIT = False +# Boolean config that determines if alert SQL queries should also be mutated or not. +MUTATE_ALERT_QUERY = False + + # This allows for a user to add header data to any outgoing emails. For example, # if you need to include metadata in the header or you want to change the specifications # of the email title, header, or sender. diff --git a/tests/integration_tests/reports/alert_tests.py b/tests/integration_tests/reports/alert_tests.py index b4dfdabd1a..c773f67554 100644 --- a/tests/integration_tests/reports/alert_tests.py +++ b/tests/integration_tests/reports/alert_tests.py @@ -15,8 +15,9 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=invalid-name, unused-argument, import-outside-toplevel +import uuid from contextlib import nullcontext, suppress -from typing import Optional, Union +from typing import Any, Optional, Union import pandas as pd import pytest @@ -84,7 +85,7 @@ def test_execute_query_as_report_executor( database=get_example_database(), validator_config_json='{"op": "==", "threshold": 1}', ) - command = AlertCommand(report_schedule=report_schedule) + command = AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()) override_user_mock = mocker.patch("superset.commands.report.alert.override_user") cm = ( pytest.raises(type(expected_result)) @@ -98,6 +99,86 @@ def test_execute_query_as_report_executor( app.config["ALERT_REPORTS_EXECUTE_AS"] = original_config +def test_execute_query_mutate_query_enabled( + mocker: MockerFixture, + app_context: AppContext, + get_user, +) -> None: + from superset.commands.report.alert import AlertCommand + from superset.reports.models import ReportSchedule + + def mock_mutate_query(sql: str, **kwargs: Any) -> str: + return "before " + sql + " after" + + app.config["SQL_QUERY_MUTATOR"] = mock_mutate_query + app.config["MUTATE_ALERT_QUERY"] = True + app.config["ALERT_REPORTS_EXECUTE_AS"] = [ExecutorType.OWNER] + + mocker.patch("superset.commands.report.alert.override_user") + mock_df = mocker.MagicMock(spec=pd.DataFrame) + mock_df.empty = True + mock_database = get_example_database() + mock_get_df = mocker.patch.object(mock_database, "get_df", return_value=mock_df) + mock_limited_sql = mocker.patch.object( + mock_database, "apply_limit_to_sql", return_value="SELECT 1\nLIMIT 2" + ) + + report_schedule = ReportSchedule( + created_by=get_user("admin"), + owners=[get_user("admin")], + type=ReportScheduleType.ALERT, + description="description", + crontab="0 9 * * *", + creation_method=ReportCreationMethod.ALERTS_REPORTS, + sql="SELECT 1", + grace_period=14400, + working_timeout=3600, + database=mock_database, + validator_config_json='{"op": "==", "threshold": 1}', + ) + + AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run() + mock_get_df.assert_called_once_with( + sql=f"before {mock_limited_sql.return_value} after" + ) + + +def test_execute_query_mutate_query_disabled( + mocker: MockerFixture, + app_context: AppContext, + get_user, +) -> None: + from superset.commands.report.alert import AlertCommand + from superset.reports.models import ReportSchedule + + def mock_mutate_query(sql: str, **kwargs: Any) -> str: + return "before " + sql + " after" + + app.config["SQL_QUERY_MUTATOR"] = mock_mutate_query + app.config["MUTATE_ALERT_QUERY"] = False + app.config["ALERT_REPORTS_EXECUTE_AS"] = [ExecutorType.OWNER] + + mocker.patch("superset.commands.report.alert.override_user") + mock_database = mocker.MagicMock() + + report_schedule = ReportSchedule( + created_by=get_user("admin"), + owners=[get_user("admin")], + type=ReportScheduleType.ALERT, + description="description", + crontab="0 9 * * *", + creation_method=ReportCreationMethod.ALERTS_REPORTS, + sql="SELECT 1", + grace_period=14400, + working_timeout=3600, + database=mock_database, + validator_config_json='{"op": "==", "threshold": 1}', + ) + + AlertCommand(report_schedule=report_schedule, execution_id=uuid.uuid4()).run() + mock_database.mutate_sql_based_on_config.assert_not_called() + + def test_execute_query_succeeded_no_retry( mocker: MockerFixture, app_context: None ) -> None: @@ -108,7 +189,7 @@ def test_execute_query_succeeded_no_retry( side_effect=lambda: pd.DataFrame([{"sample_col": 0}]), ) - command = AlertCommand(report_schedule=mocker.Mock()) + command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) command.validate() @@ -140,7 +221,7 @@ def test_execute_query_succeeded_with_retries( execute_query_mock.side_effect = _mocked_execute_query execute_query_mock.__name__ = "mocked_execute_query" - command = AlertCommand(report_schedule=mocker.Mock()) + command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) command.validate() @@ -162,7 +243,7 @@ def test_execute_query_failed_no_retry( execute_query_mock.side_effect = _mocked_execute_query execute_query_mock.__name__ = "mocked_execute_query" - command = AlertCommand(report_schedule=mocker.Mock()) + command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) with suppress(AlertQueryTimeout): command.validate() @@ -184,9 +265,42 @@ def test_execute_query_failed_max_retries( execute_query_mock.side_effect = _mocked_execute_query execute_query_mock.__name__ = "mocked_execute_query" - command = AlertCommand(report_schedule=mocker.Mock()) + command = AlertCommand(report_schedule=mocker.Mock(), execution_id=uuid.uuid4()) with suppress(AlertQueryError): command.validate() # Should match the value defined in superset_test_config.py assert execute_query_mock.call_count == 3 + + +def test_get_alert_metadata_from_object( + mocker: MockerFixture, + app_context: AppContext, + get_user, +) -> None: + from superset.commands.report.alert import AlertCommand + from superset.reports.models import ReportSchedule + + app.config["ALERT_REPORTS_EXECUTE_AS"] = [ExecutorType.OWNER] + + mock_database = mocker.MagicMock() + mock_exec_id = uuid.uuid4() + report_schedule = ReportSchedule( + created_by=get_user("admin"), + owners=[get_user("admin")], + type=ReportScheduleType.ALERT, + description="description", + crontab="0 9 * * *", + creation_method=ReportCreationMethod.ALERTS_REPORTS, + sql="SELECT 1", + grace_period=14400, + working_timeout=3600, + database=mock_database, + validator_config_json='{"op": "==", "threshold": 1}', + ) + + cm = AlertCommand(report_schedule=report_schedule, execution_id=mock_exec_id) + assert cm._get_alert_metadata_from_object() == { + "report_schedule_id": report_schedule.id, + "execution_id": mock_exec_id, + }
