This is an automated email from the ASF dual-hosted git repository.

bkyryliuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b9292e  fix: add retry to SQL-based alerting celery task (#10542)
8b9292e is described below

commit 8b9292ed057540aaae3fe1d09952345261e92bc9
Author: Jason Davis <[email protected]>
AuthorDate: Mon Aug 10 10:20:43 2020 -0700

    fix: add retry to SQL-based alerting celery task (#10542)
    
    * added retry and minimized sqlalchemy object lives
    
    * pylint
    
    * added try catch
    
    * adjusted naming
    
    * added scoped session
    
    * update tests for dbsession
    
    * added requested changes
    
    * nit todo
    
    Co-authored-by: Jason Davis <@dropbox.com>
---
 superset/tasks/schedules.py |  74 ++++++++++++++++---------
 tests/alerts_tests.py       | 130 ++++++++++++++++++++++----------------------
 2 files changed, 113 insertions(+), 91 deletions(-)

diff --git a/superset/tasks/schedules.py b/superset/tasks/schedules.py
index 4fd55aa..e297da2 100644
--- a/superset/tasks/schedules.py
+++ b/superset/tasks/schedules.py
@@ -47,12 +47,13 @@ from flask_login import login_user
 from retry.api import retry_call
 from selenium.common.exceptions import WebDriverException
 from selenium.webdriver import chrome, firefox
-from sqlalchemy.orm import Session
+from sqlalchemy.exc import NoSuchColumnError, ResourceClosedError
 from werkzeug.http import parse_cookie
 
 from superset import app, db, security_manager, thumbnail_cache
 from superset.extensions import celery_app
 from superset.models.alerts import Alert, AlertLog
+from superset.models.core import Database
 from superset.models.dashboard import Dashboard
 from superset.models.schedules import (
     EmailDeliveryType,
@@ -79,6 +80,7 @@ config = app.config
 logger = logging.getLogger("tasks.email_reports")
 logger.setLevel(logging.INFO)
 
+stats_logger = current_app.config["STATS_LOGGER"]
 EMAIL_PAGE_RENDER_WAIT = config["EMAIL_PAGE_RENDER_WAIT"]
 WEBDRIVER_BASEURL = config["WEBDRIVER_BASEURL"]
 WEBDRIVER_BASEURL_USER_FRIENDLY = config["WEBDRIVER_BASEURL_USER_FRIENDLY"]
@@ -533,6 +535,11 @@ def schedule_email_report(  # pylint: 
disable=unused-argument
     name="alerts.run_query",
     bind=True,
     soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"],
+    # TODO: find cause of 
https://github.com/apache/incubator-superset/issues/10530
+    # and remove retry
+    autoretry_for=(NoSuchColumnError, ResourceClosedError,),
+    retry_kwargs={"max_retries": 5},
+    retry_backoff=True,
 )
 def schedule_alert_query(  # pylint: disable=unused-argument
     task: Task,
@@ -542,24 +549,33 @@ def schedule_alert_query(  # pylint: 
disable=unused-argument
     is_test_alert: Optional[bool] = False,
 ) -> None:
     model_cls = get_scheduler_model(report_type)
-    dbsession = db.create_scoped_session()
-    schedule = dbsession.query(model_cls).get(schedule_id)
 
-    # The user may have disabled the schedule. If so, ignore this
-    if not schedule or not schedule.active:
-        logger.info("Ignoring deactivated alert")
-        return
+    try:
+        schedule = db.session.query(model_cls).get(schedule_id)
 
-    if report_type == ScheduleType.alert:
-        if is_test_alert and recipients:
-            deliver_alert(schedule.id, recipients)
+        # The user may have disabled the schedule. If so, ignore this
+        if not schedule or not schedule.active:
+            logger.info("Ignoring deactivated alert")
             return
 
-        if run_alert_query(schedule.id, dbsession):
-            # deliver_dashboard OR deliver_slice
-            return
-    else:
-        raise RuntimeError("Unknown report type")
+        if report_type == ScheduleType.alert:
+            if is_test_alert and recipients:
+                deliver_alert(schedule.id, recipients)
+                return
+
+            if run_alert_query(
+                schedule.id, schedule.database_id, schedule.sql, schedule.label
+            ):
+                # deliver_dashboard OR deliver_slice
+                return
+        else:
+            raise RuntimeError("Unknown report type")
+    except NoSuchColumnError as column_error:
+        stats_logger.incr("run_alert_task.error.nosuchcolumnerror")
+        raise column_error
+    except ResourceClosedError as resource_error:
+        stats_logger.incr("run_alert_task.error.resourceclosederror")
+        raise resource_error
 
 
 class AlertState:
@@ -618,23 +634,23 @@ def deliver_alert(alert_id: int, recipients: 
Optional[str] = None) -> None:
     _deliver_email(recipients, deliver_as_group, subject, body, data, images)
 
 
-def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]:
+def run_alert_query(
+    alert_id: int, database_id: int, sql: str, label: str
+) -> Optional[bool]:
     """
     Execute alert.sql and return value if any rows are returned
     """
-    alert = db.session.query(Alert).get(alert_id)
-
-    logger.info("Processing alert ID: %i", alert.id)
-    database = alert.database
+    logger.info("Processing alert ID: %i", alert_id)
+    database = db.session.query(Database).get(database_id)
     if not database:
         logger.error("Alert database not preset")
         return None
 
-    if not alert.sql:
+    if not sql:
         logger.error("Alert SQL not preset")
         return None
 
-    parsed_query = ParsedQuery(alert.sql)
+    parsed_query = ParsedQuery(sql)
     sql = parsed_query.stripped()
 
     state = None
@@ -642,27 +658,31 @@ def run_alert_query(alert_id: int, dbsession: Session) -> 
Optional[bool]:
 
     df = pd.DataFrame()
     try:
-        logger.info("Evaluating SQL for alert %s", alert)
+        logger.info("Evaluating SQL for alert <%s:%s>", alert_id, label)
         df = database.get_df(sql)
     except Exception as exc:  # pylint: disable=broad-except
         state = AlertState.ERROR
         logging.exception(exc)
-        logging.error("Failed at evaluating alert: %s (%s)", alert.label, 
alert.id)
+        logging.error("Failed at evaluating alert: %s (%s)", label, alert_id)
 
     dttm_end = datetime.utcnow()
+    last_eval_dttm = datetime.utcnow()
 
     if state != AlertState.ERROR:
-        alert.last_eval_dttm = datetime.utcnow()
         if not df.empty:
             # Looking for truthy cells
             for row in df.to_records():
                 if any(row):
                     state = AlertState.TRIGGER
-                    deliver_alert(alert.id)
+                    deliver_alert(alert_id)
                     break
         if not state:
             state = AlertState.PASS
 
+    db.session.commit()
+    alert = db.session.query(Alert).get(alert_id)
+    if state != AlertState.ERROR:
+        alert.last_eval_dttm = last_eval_dttm
     alert.last_state = state
     alert.logs.append(
         AlertLog(
@@ -672,7 +692,7 @@ def run_alert_query(alert_id: int, dbsession: Session) -> 
Optional[bool]:
             state=state,
         )
     )
-    dbsession.commit()
+    db.session.commit()
 
     return None
 
diff --git a/tests/alerts_tests.py b/tests/alerts_tests.py
index c78847c..5749821 100644
--- a/tests/alerts_tests.py
+++ b/tests/alerts_tests.py
@@ -38,41 +38,42 @@ def setup_database():
         slice_id = db.session.query(Slice).all()[0].id
         database_id = utils.get_example_database().id
 
-        alert1 = Alert(
-            id=1,
-            label="alert_1",
-            active=True,
-            crontab="*/1 * * * *",
-            sql="SELECT 0",
-            alert_type="email",
-            slice_id=slice_id,
-            database_id=database_id,
-        )
-        alert2 = Alert(
-            id=2,
-            label="alert_2",
-            active=True,
-            crontab="*/1 * * * *",
-            sql="SELECT 55",
-            alert_type="email",
-            slice_id=slice_id,
-            database_id=database_id,
-        )
-        alert3 = Alert(
-            id=3,
-            label="alert_3",
-            active=False,
-            crontab="*/1 * * * *",
-            sql="UPDATE 55",
-            alert_type="email",
-            slice_id=slice_id,
-            database_id=database_id,
-        )
-        alert4 = Alert(id=4, active=False, label="alert_4", database_id=-1)
-        alert5 = Alert(id=5, active=False, label="alert_5", 
database_id=database_id)
-
-        for num in range(1, 6):
-            eval(f"db.session.add(alert{num})")
+        alerts = [
+            Alert(
+                id=1,
+                label="alert_1",
+                active=True,
+                crontab="*/1 * * * *",
+                sql="SELECT 0",
+                alert_type="email",
+                slice_id=slice_id,
+                database_id=database_id,
+            ),
+            Alert(
+                id=2,
+                label="alert_2",
+                active=True,
+                crontab="*/1 * * * *",
+                sql="SELECT 55",
+                alert_type="email",
+                slice_id=slice_id,
+                database_id=database_id,
+            ),
+            Alert(
+                id=3,
+                label="alert_3",
+                active=False,
+                crontab="*/1 * * * *",
+                sql="UPDATE 55",
+                alert_type="email",
+                slice_id=slice_id,
+                database_id=database_id,
+            ),
+            Alert(id=4, active=False, label="alert_4", database_id=-1),
+            Alert(id=5, active=False, label="alert_5", 
database_id=database_id),
+        ]
+
+        db.session.bulk_save_objects(alerts)
         db.session.commit()
         yield db.session
 
@@ -82,45 +83,46 @@ def setup_database():
 
 @patch("superset.tasks.schedules.deliver_alert")
 @patch("superset.tasks.schedules.logging.Logger.error")
-def test_run_alert_query(mock_error, mock_deliver, setup_database):
-    database = setup_database
-    run_alert_query(database.query(Alert).filter_by(id=1).one().id, database)
-    alert1 = database.query(Alert).filter_by(id=1).one()
-    assert mock_deliver.call_count == 0
-    assert len(alert1.logs) == 1
-    assert alert1.logs[0].alert_id == 1
-    assert alert1.logs[0].state == "pass"
-
-    run_alert_query(database.query(Alert).filter_by(id=2).one().id, database)
-    alert2 = database.query(Alert).filter_by(id=2).one()
-    assert mock_deliver.call_count == 1
-    assert len(alert2.logs) == 1
-    assert alert2.logs[0].alert_id == 2
-    assert alert2.logs[0].state == "trigger"
-
-    run_alert_query(database.query(Alert).filter_by(id=3).one().id, database)
-    alert3 = database.query(Alert).filter_by(id=3).one()
-    assert mock_deliver.call_count == 1
+def test_run_alert_query(mock_error, mock_deliver_alert, setup_database):
+    dbsession = setup_database
+
+    # Test passing alert with null SQL result
+    alert1 = dbsession.query(Alert).filter_by(id=1).one()
+    run_alert_query(alert1.id, alert1.database_id, alert1.sql, alert1.label)
+    assert mock_deliver_alert.call_count == 0
+    assert mock_error.call_count == 0
+
+    # Test passing alert with True SQL result
+    alert2 = dbsession.query(Alert).filter_by(id=2).one()
+    run_alert_query(alert2.id, alert2.database_id, alert2.sql, alert2.label)
+    assert mock_deliver_alert.call_count == 1
+    assert mock_error.call_count == 0
+
+    # Test passing alert with error in SQL query
+    alert3 = dbsession.query(Alert).filter_by(id=3).one()
+    run_alert_query(alert3.id, alert3.database_id, alert3.sql, alert3.label)
+    assert mock_deliver_alert.call_count == 1
     assert mock_error.call_count == 2
-    assert len(alert3.logs) == 1
-    assert alert3.logs[0].alert_id == 3
-    assert alert3.logs[0].state == "error"
 
-    run_alert_query(database.query(Alert).filter_by(id=4).one().id, database)
-    assert mock_deliver.call_count == 1
+    # Test passing alert with invalid database
+    alert4 = dbsession.query(Alert).filter_by(id=4).one()
+    run_alert_query(alert4.id, alert4.database_id, alert4.sql, alert4.label)
+    assert mock_deliver_alert.call_count == 1
     assert mock_error.call_count == 3
 
-    run_alert_query(database.query(Alert).filter_by(id=5).one().id, database)
-    assert mock_deliver.call_count == 1
+    # Test passing alert with no SQL statement
+    alert5 = dbsession.query(Alert).filter_by(id=5).one()
+    run_alert_query(alert5.id, alert5.database_id, alert5.sql, alert5.label)
+    assert mock_deliver_alert.call_count == 1
     assert mock_error.call_count == 4
 
 
 @patch("superset.tasks.schedules.deliver_alert")
 @patch("superset.tasks.schedules.run_alert_query")
 def test_schedule_alert_query(mock_run_alert, mock_deliver_alert, 
setup_database):
-    database = setup_database
-    active_alert = database.query(Alert).filter_by(id=1).one()
-    inactive_alert = database.query(Alert).filter_by(id=3).one()
+    dbsession = setup_database
+    active_alert = dbsession.query(Alert).filter_by(id=1).one()
+    inactive_alert = dbsession.query(Alert).filter_by(id=3).one()
 
     # Test that inactive alerts are no processed
     schedule_alert_query(report_type=ScheduleType.alert, 
schedule_id=inactive_alert.id)

Reply via email to