This is an automated email from the ASF dual-hosted git repository.
vavila pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new 754ccd0448 feat: Mutate SQL query executed by alerts (#31840)
754ccd0448 is described below
commit 754ccd0448737ab4048a14aa7aa35468b78cbd04
Author: Vitor Avila <[email protected]>
AuthorDate: Wed Jan 15 00:43:40 2025 -0300
feat: Mutate SQL query executed by alerts (#31840)
---
superset/commands/report/alert.py | 19 +++-
superset/commands/report/execute.py | 4 +-
superset/config.py | 4 +
tests/integration_tests/reports/alert_tests.py | 121 ++++++++++++++++++++++++-
4 files changed, 140 insertions(+), 8 deletions(-)
diff --git a/superset/commands/report/alert.py
b/superset/commands/report/alert.py
index ea45853b29..d713c45811 100644
--- a/superset/commands/report/alert.py
+++ b/superset/commands/report/alert.py
@@ -20,6 +20,7 @@ import logging
from operator import eq, ge, gt, le, lt, ne
from timeit import default_timer
from typing import Any
+from uuid import UUID
import numpy as np
import pandas as pd
@@ -40,6 +41,7 @@ from superset.reports.models import ReportSchedule,
ReportScheduleValidatorType
from superset.tasks.utils import get_executor
from superset.utils import json
from superset.utils.core import override_user
+from superset.utils.decorators import logs_context
from superset.utils.retries import retry_call
logger = logging.getLogger(__name__)
@@ -52,8 +54,9 @@ OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt,
"==": eq, "!=": ne}
class AlertCommand(BaseCommand):
- def __init__(self, report_schedule: ReportSchedule):
+ def __init__(self, report_schedule: ReportSchedule, execution_id: UUID):
self._report_schedule = report_schedule
+ self._execution_id = execution_id
self._result: float | None = None
def run(self) -> bool:
@@ -135,6 +138,13 @@ class AlertCommand(BaseCommand):
self._report_schedule.validator_type ==
ReportScheduleValidatorType.OPERATOR
)
+ def _get_alert_metadata_from_object(self) -> dict[str, Any]:
+ return {
+ "report_schedule_id": self._report_schedule.id,
+ "execution_id": self._execution_id,
+ }
+
+ @logs_context(context_func=_get_alert_metadata_from_object)
def _execute_query(self) -> pd.DataFrame:
"""
Executes the actual alert SQL query template
@@ -152,6 +162,13 @@ class AlertCommand(BaseCommand):
rendered_sql, ALERT_SQL_LIMIT
)
+ if app.config["MUTATE_ALERT_QUERY"]:
+ limited_rendered_sql = (
+ self._report_schedule.database.mutate_sql_based_on_config(
+ limited_rendered_sql
+ )
+ )
+
executor, username = get_executor( # pylint:
disable=unused-variable
executor_types=app.config["ALERT_REPORTS_EXECUTE_AS"],
model=self._report_schedule,
diff --git a/superset/commands/report/execute.py
b/superset/commands/report/execute.py
index 9293e967aa..54a2890a96 100644
--- a/superset/commands/report/execute.py
+++ b/superset/commands/report/execute.py
@@ -698,7 +698,7 @@ class ReportNotTriggeredErrorState(BaseReportState):
try:
# If it's an alert check if the alert is triggered
if self._report_schedule.type == ReportScheduleType.ALERT:
- if not AlertCommand(self._report_schedule).run():
+ if not AlertCommand(self._report_schedule,
self._execution_id).run():
self.update_report_schedule_and_log(ReportState.NOOP)
return
self.send()
@@ -782,7 +782,7 @@ class ReportSuccessState(BaseReportState):
return
self.update_report_schedule_and_log(ReportState.WORKING)
try:
- if not AlertCommand(self._report_schedule).run():
+ if not AlertCommand(self._report_schedule,
self._execution_id).run():
self.update_report_schedule_and_log(ReportState.NOOP)
return
except Exception as ex:
diff --git a/superset/config.py b/superset/config.py
index a77ab15f20..fa55318f69 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -1380,6 +1380,10 @@ def SQL_QUERY_MUTATOR( # pylint:
disable=invalid-name,unused-argument # noqa:
MUTATE_AFTER_SPLIT = False
+# Boolean config that determines if alert SQL queries should also be mutated
or not.
+MUTATE_ALERT_QUERY = False
+
+
# This allows for a user to add header data to any outgoing emails. For
example,
# if you need to include metadata in the header or you want to change the
specifications
# of the email title, header, or sender.
diff --git a/tests/integration_tests/reports/alert_tests.py
b/tests/integration_tests/reports/alert_tests.py
index b4dfdabd1a..16ce8f3fed 100644
--- a/tests/integration_tests/reports/alert_tests.py
+++ b/tests/integration_tests/reports/alert_tests.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=invalid-name, unused-argument, import-outside-toplevel
+import uuid
from contextlib import nullcontext, suppress
from typing import Optional, Union
@@ -84,7 +85,7 @@ def test_execute_query_as_report_executor(
database=get_example_database(),
validator_config_json='{"op": "==", "threshold": 1}',
)
- command = AlertCommand(report_schedule=report_schedule)
+ command = AlertCommand(report_schedule=report_schedule,
execution_id=uuid.uuid4())
override_user_mock =
mocker.patch("superset.commands.report.alert.override_user")
cm = (
pytest.raises(type(expected_result))
@@ -98,6 +99,83 @@ def test_execute_query_as_report_executor(
app.config["ALERT_REPORTS_EXECUTE_AS"] = original_config
+def test_execute_query_mutate_query_enabled(
+ mocker: MockerFixture,
+ app_context: AppContext,
+ get_user,
+) -> None:
+ from superset.commands.report.alert import AlertCommand
+ from superset.reports.models import ReportSchedule
+
+ default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"]
+
+ app.config["MUTATE_ALERT_QUERY"] = True
+ mocker.patch("superset.commands.report.alert.override_user")
+ mock_df = mocker.MagicMock(spec=pd.DataFrame)
+ mock_df.empty = True
+ mock_database = get_example_database()
+ mock_get_df = mocker.patch.object(mock_database, "get_df",
return_value=mock_df)
+ mock_limited_sql = mocker.patch.object(mock_database, "apply_limit_to_sql")
+ mock_mutate_call = mocker.patch.object(mock_database,
"mutate_sql_based_on_config")
+
+ report_schedule = ReportSchedule(
+ created_by=get_user("admin"),
+ owners=[get_user("admin")],
+ type=ReportScheduleType.ALERT,
+ description="description",
+ crontab="0 9 * * *",
+ creation_method=ReportCreationMethod.ALERTS_REPORTS,
+ sql="SELECT 1",
+ grace_period=14400,
+ working_timeout=3600,
+ database=mock_database,
+ validator_config_json='{"op": "==", "threshold": 1}',
+ )
+ AlertCommand(report_schedule=report_schedule,
execution_id=uuid.uuid4()).run()
+
+ mock_mutate_call.assert_called_once_with(mock_limited_sql.return_value)
+ mock_get_df.assert_called_once_with(sql=mock_mutate_call.return_value)
+
+ app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff
+
+
+def test_execute_query_mutate_query_disabled(
+ mocker: MockerFixture,
+ app_context: AppContext,
+ get_user,
+) -> None:
+ from superset.commands.report.alert import AlertCommand
+ from superset.reports.models import ReportSchedule
+
+ default_alert_mutate_ff = app.config["MUTATE_ALERT_QUERY"]
+
+ app.config["MUTATE_ALERT_QUERY"] = False
+ mocker.patch("superset.commands.report.alert.override_user")
+ mock_database = mocker.MagicMock()
+
+ report_schedule = ReportSchedule(
+ created_by=get_user("admin"),
+ owners=[get_user("admin")],
+ type=ReportScheduleType.ALERT,
+ description="description",
+ crontab="0 9 * * *",
+ creation_method=ReportCreationMethod.ALERTS_REPORTS,
+ sql="SELECT 1",
+ grace_period=14400,
+ working_timeout=3600,
+ database=mock_database,
+ validator_config_json='{"op": "==", "threshold": 1}',
+ )
+ AlertCommand(report_schedule=report_schedule,
execution_id=uuid.uuid4()).run()
+
+ mock_database.mutate_sql_based_on_config.assert_not_called()
+ mock_database.get_df.assert_called_once_with(
+ sql=mock_database.apply_limit_to_sql.return_value
+ )
+
+ app.config["MUTATE_ALERT_QUERY"] = default_alert_mutate_ff
+
+
def test_execute_query_succeeded_no_retry(
mocker: MockerFixture, app_context: None
) -> None:
@@ -108,7 +186,7 @@ def test_execute_query_succeeded_no_retry(
side_effect=lambda: pd.DataFrame([{"sample_col": 0}]),
)
- command = AlertCommand(report_schedule=mocker.Mock())
+ command = AlertCommand(report_schedule=mocker.Mock(),
execution_id=uuid.uuid4())
command.validate()
@@ -140,7 +218,7 @@ def test_execute_query_succeeded_with_retries(
execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"
- command = AlertCommand(report_schedule=mocker.Mock())
+ command = AlertCommand(report_schedule=mocker.Mock(),
execution_id=uuid.uuid4())
command.validate()
@@ -162,7 +240,7 @@ def test_execute_query_failed_no_retry(
execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"
- command = AlertCommand(report_schedule=mocker.Mock())
+ command = AlertCommand(report_schedule=mocker.Mock(),
execution_id=uuid.uuid4())
with suppress(AlertQueryTimeout):
command.validate()
@@ -184,9 +262,42 @@ def test_execute_query_failed_max_retries(
execute_query_mock.side_effect = _mocked_execute_query
execute_query_mock.__name__ = "mocked_execute_query"
- command = AlertCommand(report_schedule=mocker.Mock())
+ command = AlertCommand(report_schedule=mocker.Mock(),
execution_id=uuid.uuid4())
with suppress(AlertQueryError):
command.validate()
# Should match the value defined in superset_test_config.py
assert execute_query_mock.call_count == 3
+
+
+def test_get_alert_metadata_from_object(
+ mocker: MockerFixture,
+ app_context: AppContext,
+ get_user,
+) -> None:
+ from superset.commands.report.alert import AlertCommand
+ from superset.reports.models import ReportSchedule
+
+ app.config["ALERT_REPORTS_EXECUTE_AS"] = [ExecutorType.OWNER]
+
+ mock_database = mocker.MagicMock()
+ mock_exec_id = uuid.uuid4()
+ report_schedule = ReportSchedule(
+ created_by=get_user("admin"),
+ owners=[get_user("admin")],
+ type=ReportScheduleType.ALERT,
+ description="description",
+ crontab="0 9 * * *",
+ creation_method=ReportCreationMethod.ALERTS_REPORTS,
+ sql="SELECT 1",
+ grace_period=14400,
+ working_timeout=3600,
+ database=mock_database,
+ validator_config_json='{"op": "==", "threshold": 1}',
+ )
+
+ cm = AlertCommand(report_schedule=report_schedule,
execution_id=mock_exec_id)
+ assert cm._get_alert_metadata_from_object() == {
+ "report_schedule_id": report_schedule.id,
+ "execution_id": mock_exec_id,
+ }