This is an automated email from the ASF dual-hosted git repository.

vavila pushed a commit to branch feat/mutate-alert-query
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 42d8c2753bc353e68d32ca52f0286ea6cb607409
Author: Vitor Avila <[email protected]>
AuthorDate: Tue Jan 14 15:03:07 2025 -0300

    feat: Mutate SQL query executed by alerts
---
 superset/commands/report/alert.py              |  19 +++-
 superset/commands/report/execute.py            |   4 +-
 superset/config.py                             |   4 +
 tests/integration_tests/reports/alert_tests.py | 126 +++++++++++++++++++++++--
 4 files changed, 144 insertions(+), 9 deletions(-)

diff --git a/superset/commands/report/alert.py 
b/superset/commands/report/alert.py
index ea45853b29..d713c45811 100644
--- a/superset/commands/report/alert.py
+++ b/superset/commands/report/alert.py
@@ -20,6 +20,7 @@ import logging
 from operator import eq, ge, gt, le, lt, ne
 from timeit import default_timer
 from typing import Any
+from uuid import UUID
 
 import numpy as np
 import pandas as pd
@@ -40,6 +41,7 @@ from superset.reports.models import ReportSchedule, 
ReportScheduleValidatorType
 from superset.tasks.utils import get_executor
 from superset.utils import json
 from superset.utils.core import override_user
+from superset.utils.decorators import logs_context
 from superset.utils.retries import retry_call
 
 logger = logging.getLogger(__name__)
@@ -52,8 +54,9 @@ OPERATOR_FUNCTIONS = {">=": ge, ">": gt, "<=": le, "<": lt, 
"==": eq, "!=": ne}
 
 
 class AlertCommand(BaseCommand):
-    def __init__(self, report_schedule: ReportSchedule):
+    def __init__(self, report_schedule: ReportSchedule, execution_id: UUID):
         self._report_schedule = report_schedule
+        self._execution_id = execution_id
         self._result: float | None = None
 
     def run(self) -> bool:
@@ -135,6 +138,13 @@ class AlertCommand(BaseCommand):
             self._report_schedule.validator_type == 
ReportScheduleValidatorType.OPERATOR
         )
 
+    def _get_alert_metadata_from_object(self) -> dict[str, Any]:
+        return {
+            "report_schedule_id": self._report_schedule.id,
+            "execution_id": self._execution_id,
+        }
+
+    @logs_context(context_func=_get_alert_metadata_from_object)
     def _execute_query(self) -> pd.DataFrame:
         """
         Executes the actual alert SQL query template
@@ -152,6 +162,13 @@ class AlertCommand(BaseCommand):
                 rendered_sql, ALERT_SQL_LIMIT
             )
 
+            if app.config["MUTATE_ALERT_QUERY"]:
+                limited_rendered_sql = (
+                    self._report_schedule.database.mutate_sql_based_on_config(
+                        limited_rendered_sql
+                    )
+                )
+
             executor, username = get_executor(  # pylint: 
disable=unused-variable
                 executor_types=app.config["ALERT_REPORTS_EXECUTE_AS"],
                 model=self._report_schedule,
diff --git a/superset/commands/report/execute.py 
b/superset/commands/report/execute.py
index 9293e967aa..54a2890a96 100644
--- a/superset/commands/report/execute.py
+++ b/superset/commands/report/execute.py
@@ -698,7 +698,7 @@ class ReportNotTriggeredErrorState(BaseReportState):
         try:
             # If it's an alert check if the alert is triggered
             if self._report_schedule.type == ReportScheduleType.ALERT:
-                if not AlertCommand(self._report_schedule).run():
+                if not AlertCommand(self._report_schedule, 
self._execution_id).run():
                     self.update_report_schedule_and_log(ReportState.NOOP)
                     return
             self.send()
@@ -782,7 +782,7 @@ class ReportSuccessState(BaseReportState):
                 return
             self.update_report_schedule_and_log(ReportState.WORKING)
             try:
-                if not AlertCommand(self._report_schedule).run():
+                if not AlertCommand(self._report_schedule, 
self._execution_id).run():
                     self.update_report_schedule_and_log(ReportState.NOOP)
                     return
             except Exception as ex:
diff --git a/superset/config.py b/superset/config.py
index f36ffade3c..f99751eeb2 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -1380,6 +1380,10 @@ def SQL_QUERY_MUTATOR(  # pylint: 
disable=invalid-name,unused-argument  # noqa:
 MUTATE_AFTER_SPLIT = False
 
 
+# Boolean config that determines if alert SQL queries should also be mutated 
or not.
+MUTATE_ALERT_QUERY = False
+
+
 # This allows for a user to add header data to any outgoing emails. For 
example,
 # if you need to include metadata in the header or you want to change the 
specifications
 # of the email title, header, or sender.
diff --git a/tests/integration_tests/reports/alert_tests.py 
b/tests/integration_tests/reports/alert_tests.py
index b4dfdabd1a..c773f67554 100644
--- a/tests/integration_tests/reports/alert_tests.py
+++ b/tests/integration_tests/reports/alert_tests.py
@@ -15,8 +15,9 @@
 # specific language governing permissions and limitations
 # under the License.
 # pylint: disable=invalid-name, unused-argument, import-outside-toplevel
+import uuid
 from contextlib import nullcontext, suppress
-from typing import Optional, Union
+from typing import Any, Optional, Union
 
 import pandas as pd
 import pytest
@@ -84,7 +85,7 @@ def test_execute_query_as_report_executor(
         database=get_example_database(),
         validator_config_json='{"op": "==", "threshold": 1}',
     )
-    command = AlertCommand(report_schedule=report_schedule)
+    command = AlertCommand(report_schedule=report_schedule, 
execution_id=uuid.uuid4())
     override_user_mock = 
mocker.patch("superset.commands.report.alert.override_user")
     cm = (
         pytest.raises(type(expected_result))
@@ -98,6 +99,86 @@ def test_execute_query_as_report_executor(
     app.config["ALERT_REPORTS_EXECUTE_AS"] = original_config
 
 
+def test_execute_query_mutate_query_enabled(
+    mocker: MockerFixture,
+    app_context: AppContext,
+    get_user,
+) -> None:
+    from superset.commands.report.alert import AlertCommand
+    from superset.reports.models import ReportSchedule
+
+    def mock_mutate_query(sql: str, **kwargs: Any) -> str:
+        return "before " + sql + " after"
+
+    app.config["SQL_QUERY_MUTATOR"] = mock_mutate_query
+    app.config["MUTATE_ALERT_QUERY"] = True
+    app.config["ALERT_REPORTS_EXECUTE_AS"] = [ExecutorType.OWNER]
+
+    mocker.patch("superset.commands.report.alert.override_user")
+    mock_df = mocker.MagicMock(spec=pd.DataFrame)
+    mock_df.empty = True
+    mock_database = get_example_database()
+    mock_get_df = mocker.patch.object(mock_database, "get_df", 
return_value=mock_df)
+    mock_limited_sql = mocker.patch.object(
+        mock_database, "apply_limit_to_sql", return_value="SELECT 1\nLIMIT 2"
+    )
+
+    report_schedule = ReportSchedule(
+        created_by=get_user("admin"),
+        owners=[get_user("admin")],
+        type=ReportScheduleType.ALERT,
+        description="description",
+        crontab="0 9 * * *",
+        creation_method=ReportCreationMethod.ALERTS_REPORTS,
+        sql="SELECT 1",
+        grace_period=14400,
+        working_timeout=3600,
+        database=mock_database,
+        validator_config_json='{"op": "==", "threshold": 1}',
+    )
+
+    AlertCommand(report_schedule=report_schedule, 
execution_id=uuid.uuid4()).run()
+    mock_get_df.assert_called_once_with(
+        sql=f"before {mock_limited_sql.return_value} after"
+    )
+
+
+def test_execute_query_mutate_query_disabled(
+    mocker: MockerFixture,
+    app_context: AppContext,
+    get_user,
+) -> None:
+    from superset.commands.report.alert import AlertCommand
+    from superset.reports.models import ReportSchedule
+
+    def mock_mutate_query(sql: str, **kwargs: Any) -> str:
+        return "before " + sql + " after"
+
+    app.config["SQL_QUERY_MUTATOR"] = mock_mutate_query
+    app.config["MUTATE_ALERT_QUERY"] = False
+    app.config["ALERT_REPORTS_EXECUTE_AS"] = [ExecutorType.OWNER]
+
+    mocker.patch("superset.commands.report.alert.override_user")
+    mock_database = mocker.MagicMock()
+
+    report_schedule = ReportSchedule(
+        created_by=get_user("admin"),
+        owners=[get_user("admin")],
+        type=ReportScheduleType.ALERT,
+        description="description",
+        crontab="0 9 * * *",
+        creation_method=ReportCreationMethod.ALERTS_REPORTS,
+        sql="SELECT 1",
+        grace_period=14400,
+        working_timeout=3600,
+        database=mock_database,
+        validator_config_json='{"op": "==", "threshold": 1}',
+    )
+
+    AlertCommand(report_schedule=report_schedule, 
execution_id=uuid.uuid4()).run()
+    mock_database.mutate_sql_based_on_config.assert_not_called()
+
+
 def test_execute_query_succeeded_no_retry(
     mocker: MockerFixture, app_context: None
 ) -> None:
@@ -108,7 +189,7 @@ def test_execute_query_succeeded_no_retry(
         side_effect=lambda: pd.DataFrame([{"sample_col": 0}]),
     )
 
-    command = AlertCommand(report_schedule=mocker.Mock())
+    command = AlertCommand(report_schedule=mocker.Mock(), 
execution_id=uuid.uuid4())
 
     command.validate()
 
@@ -140,7 +221,7 @@ def test_execute_query_succeeded_with_retries(
     execute_query_mock.side_effect = _mocked_execute_query
     execute_query_mock.__name__ = "mocked_execute_query"
 
-    command = AlertCommand(report_schedule=mocker.Mock())
+    command = AlertCommand(report_schedule=mocker.Mock(), 
execution_id=uuid.uuid4())
 
     command.validate()
 
@@ -162,7 +243,7 @@ def test_execute_query_failed_no_retry(
     execute_query_mock.side_effect = _mocked_execute_query
     execute_query_mock.__name__ = "mocked_execute_query"
 
-    command = AlertCommand(report_schedule=mocker.Mock())
+    command = AlertCommand(report_schedule=mocker.Mock(), 
execution_id=uuid.uuid4())
 
     with suppress(AlertQueryTimeout):
         command.validate()
@@ -184,9 +265,42 @@ def test_execute_query_failed_max_retries(
     execute_query_mock.side_effect = _mocked_execute_query
     execute_query_mock.__name__ = "mocked_execute_query"
 
-    command = AlertCommand(report_schedule=mocker.Mock())
+    command = AlertCommand(report_schedule=mocker.Mock(), 
execution_id=uuid.uuid4())
 
     with suppress(AlertQueryError):
         command.validate()
     # Should match the value defined in superset_test_config.py
     assert execute_query_mock.call_count == 3
+
+
+def test_get_alert_metadata_from_object(
+    mocker: MockerFixture,
+    app_context: AppContext,
+    get_user,
+) -> None:
+    from superset.commands.report.alert import AlertCommand
+    from superset.reports.models import ReportSchedule
+
+    app.config["ALERT_REPORTS_EXECUTE_AS"] = [ExecutorType.OWNER]
+
+    mock_database = mocker.MagicMock()
+    mock_exec_id = uuid.uuid4()
+    report_schedule = ReportSchedule(
+        created_by=get_user("admin"),
+        owners=[get_user("admin")],
+        type=ReportScheduleType.ALERT,
+        description="description",
+        crontab="0 9 * * *",
+        creation_method=ReportCreationMethod.ALERTS_REPORTS,
+        sql="SELECT 1",
+        grace_period=14400,
+        working_timeout=3600,
+        database=mock_database,
+        validator_config_json='{"op": "==", "threshold": 1}',
+    )
+
+    cm = AlertCommand(report_schedule=report_schedule, 
execution_id=mock_exec_id)
+    assert cm._get_alert_metadata_from_object() == {
+        "report_schedule_id": report_schedule.id,
+        "execution_id": mock_exec_id,
+    }

Reply via email to