This is an automated email from the ASF dual-hosted git repository.
bkyryliuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-superset.git
The following commit(s) were added to refs/heads/master by this push:
new 8b9292e fix: add retry to SQL-based alerting celery task (#10542)
8b9292e is described below
commit 8b9292ed057540aaae3fe1d09952345261e92bc9
Author: Jason Davis <[email protected]>
AuthorDate: Mon Aug 10 10:20:43 2020 -0700
fix: add retry to SQL-based alerting celery task (#10542)
* added retry and minimized sqlalchemy object lives
* pylint
* added try catch
* adjusted naming
* added scoped session
* update tests for dbsession
* added requested changes
* nit todo
Co-authored-by: Jason Davis <@dropbox.com>
---
superset/tasks/schedules.py | 74 ++++++++++++++++---------
tests/alerts_tests.py | 130 ++++++++++++++++++++++----------------------
2 files changed, 113 insertions(+), 91 deletions(-)
diff --git a/superset/tasks/schedules.py b/superset/tasks/schedules.py
index 4fd55aa..e297da2 100644
--- a/superset/tasks/schedules.py
+++ b/superset/tasks/schedules.py
@@ -47,12 +47,13 @@ from flask_login import login_user
from retry.api import retry_call
from selenium.common.exceptions import WebDriverException
from selenium.webdriver import chrome, firefox
-from sqlalchemy.orm import Session
+from sqlalchemy.exc import NoSuchColumnError, ResourceClosedError
from werkzeug.http import parse_cookie
from superset import app, db, security_manager, thumbnail_cache
from superset.extensions import celery_app
from superset.models.alerts import Alert, AlertLog
+from superset.models.core import Database
from superset.models.dashboard import Dashboard
from superset.models.schedules import (
EmailDeliveryType,
@@ -79,6 +80,7 @@ config = app.config
logger = logging.getLogger("tasks.email_reports")
logger.setLevel(logging.INFO)
+stats_logger = current_app.config["STATS_LOGGER"]
EMAIL_PAGE_RENDER_WAIT = config["EMAIL_PAGE_RENDER_WAIT"]
WEBDRIVER_BASEURL = config["WEBDRIVER_BASEURL"]
WEBDRIVER_BASEURL_USER_FRIENDLY = config["WEBDRIVER_BASEURL_USER_FRIENDLY"]
@@ -533,6 +535,11 @@ def schedule_email_report( # pylint:
disable=unused-argument
name="alerts.run_query",
bind=True,
soft_time_limit=config["EMAIL_ASYNC_TIME_LIMIT_SEC"],
+ # TODO: find cause of
https://github.com/apache/incubator-superset/issues/10530
+ # and remove retry
+ autoretry_for=(NoSuchColumnError, ResourceClosedError,),
+ retry_kwargs={"max_retries": 5},
+ retry_backoff=True,
)
def schedule_alert_query( # pylint: disable=unused-argument
task: Task,
@@ -542,24 +549,33 @@ def schedule_alert_query( # pylint:
disable=unused-argument
is_test_alert: Optional[bool] = False,
) -> None:
model_cls = get_scheduler_model(report_type)
- dbsession = db.create_scoped_session()
- schedule = dbsession.query(model_cls).get(schedule_id)
- # The user may have disabled the schedule. If so, ignore this
- if not schedule or not schedule.active:
- logger.info("Ignoring deactivated alert")
- return
+ try:
+ schedule = db.session.query(model_cls).get(schedule_id)
- if report_type == ScheduleType.alert:
- if is_test_alert and recipients:
- deliver_alert(schedule.id, recipients)
+ # The user may have disabled the schedule. If so, ignore this
+ if not schedule or not schedule.active:
+ logger.info("Ignoring deactivated alert")
return
- if run_alert_query(schedule.id, dbsession):
- # deliver_dashboard OR deliver_slice
- return
- else:
- raise RuntimeError("Unknown report type")
+ if report_type == ScheduleType.alert:
+ if is_test_alert and recipients:
+ deliver_alert(schedule.id, recipients)
+ return
+
+ if run_alert_query(
+ schedule.id, schedule.database_id, schedule.sql, schedule.label
+ ):
+ # deliver_dashboard OR deliver_slice
+ return
+ else:
+ raise RuntimeError("Unknown report type")
+ except NoSuchColumnError as column_error:
+ stats_logger.incr("run_alert_task.error.nosuchcolumnerror")
+ raise column_error
+ except ResourceClosedError as resource_error:
+ stats_logger.incr("run_alert_task.error.resourceclosederror")
+ raise resource_error
class AlertState:
@@ -618,23 +634,23 @@ def deliver_alert(alert_id: int, recipients:
Optional[str] = None) -> None:
_deliver_email(recipients, deliver_as_group, subject, body, data, images)
-def run_alert_query(alert_id: int, dbsession: Session) -> Optional[bool]:
+def run_alert_query(
+ alert_id: int, database_id: int, sql: str, label: str
+) -> Optional[bool]:
"""
Execute alert.sql and return value if any rows are returned
"""
- alert = db.session.query(Alert).get(alert_id)
-
- logger.info("Processing alert ID: %i", alert.id)
- database = alert.database
+ logger.info("Processing alert ID: %i", alert_id)
+ database = db.session.query(Database).get(database_id)
if not database:
logger.error("Alert database not preset")
return None
- if not alert.sql:
+ if not sql:
logger.error("Alert SQL not preset")
return None
- parsed_query = ParsedQuery(alert.sql)
+ parsed_query = ParsedQuery(sql)
sql = parsed_query.stripped()
state = None
@@ -642,27 +658,31 @@ def run_alert_query(alert_id: int, dbsession: Session) ->
Optional[bool]:
df = pd.DataFrame()
try:
- logger.info("Evaluating SQL for alert %s", alert)
+ logger.info("Evaluating SQL for alert <%s:%s>", alert_id, label)
df = database.get_df(sql)
except Exception as exc: # pylint: disable=broad-except
state = AlertState.ERROR
logging.exception(exc)
- logging.error("Failed at evaluating alert: %s (%s)", alert.label,
alert.id)
+ logging.error("Failed at evaluating alert: %s (%s)", label, alert_id)
dttm_end = datetime.utcnow()
+ last_eval_dttm = datetime.utcnow()
if state != AlertState.ERROR:
- alert.last_eval_dttm = datetime.utcnow()
if not df.empty:
# Looking for truthy cells
for row in df.to_records():
if any(row):
state = AlertState.TRIGGER
- deliver_alert(alert.id)
+ deliver_alert(alert_id)
break
if not state:
state = AlertState.PASS
+ db.session.commit()
+ alert = db.session.query(Alert).get(alert_id)
+ if state != AlertState.ERROR:
+ alert.last_eval_dttm = last_eval_dttm
alert.last_state = state
alert.logs.append(
AlertLog(
@@ -672,7 +692,7 @@ def run_alert_query(alert_id: int, dbsession: Session) ->
Optional[bool]:
state=state,
)
)
- dbsession.commit()
+ db.session.commit()
return None
diff --git a/tests/alerts_tests.py b/tests/alerts_tests.py
index c78847c..5749821 100644
--- a/tests/alerts_tests.py
+++ b/tests/alerts_tests.py
@@ -38,41 +38,42 @@ def setup_database():
slice_id = db.session.query(Slice).all()[0].id
database_id = utils.get_example_database().id
- alert1 = Alert(
- id=1,
- label="alert_1",
- active=True,
- crontab="*/1 * * * *",
- sql="SELECT 0",
- alert_type="email",
- slice_id=slice_id,
- database_id=database_id,
- )
- alert2 = Alert(
- id=2,
- label="alert_2",
- active=True,
- crontab="*/1 * * * *",
- sql="SELECT 55",
- alert_type="email",
- slice_id=slice_id,
- database_id=database_id,
- )
- alert3 = Alert(
- id=3,
- label="alert_3",
- active=False,
- crontab="*/1 * * * *",
- sql="UPDATE 55",
- alert_type="email",
- slice_id=slice_id,
- database_id=database_id,
- )
- alert4 = Alert(id=4, active=False, label="alert_4", database_id=-1)
- alert5 = Alert(id=5, active=False, label="alert_5",
database_id=database_id)
-
- for num in range(1, 6):
- eval(f"db.session.add(alert{num})")
+ alerts = [
+ Alert(
+ id=1,
+ label="alert_1",
+ active=True,
+ crontab="*/1 * * * *",
+ sql="SELECT 0",
+ alert_type="email",
+ slice_id=slice_id,
+ database_id=database_id,
+ ),
+ Alert(
+ id=2,
+ label="alert_2",
+ active=True,
+ crontab="*/1 * * * *",
+ sql="SELECT 55",
+ alert_type="email",
+ slice_id=slice_id,
+ database_id=database_id,
+ ),
+ Alert(
+ id=3,
+ label="alert_3",
+ active=False,
+ crontab="*/1 * * * *",
+ sql="UPDATE 55",
+ alert_type="email",
+ slice_id=slice_id,
+ database_id=database_id,
+ ),
+ Alert(id=4, active=False, label="alert_4", database_id=-1),
+ Alert(id=5, active=False, label="alert_5",
database_id=database_id),
+ ]
+
+ db.session.bulk_save_objects(alerts)
db.session.commit()
yield db.session
@@ -82,45 +83,46 @@ def setup_database():
@patch("superset.tasks.schedules.deliver_alert")
@patch("superset.tasks.schedules.logging.Logger.error")
-def test_run_alert_query(mock_error, mock_deliver, setup_database):
- database = setup_database
- run_alert_query(database.query(Alert).filter_by(id=1).one().id, database)
- alert1 = database.query(Alert).filter_by(id=1).one()
- assert mock_deliver.call_count == 0
- assert len(alert1.logs) == 1
- assert alert1.logs[0].alert_id == 1
- assert alert1.logs[0].state == "pass"
-
- run_alert_query(database.query(Alert).filter_by(id=2).one().id, database)
- alert2 = database.query(Alert).filter_by(id=2).one()
- assert mock_deliver.call_count == 1
- assert len(alert2.logs) == 1
- assert alert2.logs[0].alert_id == 2
- assert alert2.logs[0].state == "trigger"
-
- run_alert_query(database.query(Alert).filter_by(id=3).one().id, database)
- alert3 = database.query(Alert).filter_by(id=3).one()
- assert mock_deliver.call_count == 1
+def test_run_alert_query(mock_error, mock_deliver_alert, setup_database):
+ dbsession = setup_database
+
+ # Test passing alert with null SQL result
+ alert1 = dbsession.query(Alert).filter_by(id=1).one()
+ run_alert_query(alert1.id, alert1.database_id, alert1.sql, alert1.label)
+ assert mock_deliver_alert.call_count == 0
+ assert mock_error.call_count == 0
+
+ # Test passing alert with True SQL result
+ alert2 = dbsession.query(Alert).filter_by(id=2).one()
+ run_alert_query(alert2.id, alert2.database_id, alert2.sql, alert2.label)
+ assert mock_deliver_alert.call_count == 1
+ assert mock_error.call_count == 0
+
+ # Test passing alert with error in SQL query
+ alert3 = dbsession.query(Alert).filter_by(id=3).one()
+ run_alert_query(alert3.id, alert3.database_id, alert3.sql, alert3.label)
+ assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 2
- assert len(alert3.logs) == 1
- assert alert3.logs[0].alert_id == 3
- assert alert3.logs[0].state == "error"
- run_alert_query(database.query(Alert).filter_by(id=4).one().id, database)
- assert mock_deliver.call_count == 1
+ # Test passing alert with invalid database
+ alert4 = dbsession.query(Alert).filter_by(id=4).one()
+ run_alert_query(alert4.id, alert4.database_id, alert4.sql, alert4.label)
+ assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 3
- run_alert_query(database.query(Alert).filter_by(id=5).one().id, database)
- assert mock_deliver.call_count == 1
+ # Test passing alert with no SQL statement
+ alert5 = dbsession.query(Alert).filter_by(id=5).one()
+ run_alert_query(alert5.id, alert5.database_id, alert5.sql, alert5.label)
+ assert mock_deliver_alert.call_count == 1
assert mock_error.call_count == 4
@patch("superset.tasks.schedules.deliver_alert")
@patch("superset.tasks.schedules.run_alert_query")
def test_schedule_alert_query(mock_run_alert, mock_deliver_alert,
setup_database):
- database = setup_database
- active_alert = database.query(Alert).filter_by(id=1).one()
- inactive_alert = database.query(Alert).filter_by(id=3).one()
+ dbsession = setup_database
+ active_alert = dbsession.query(Alert).filter_by(id=1).one()
+ inactive_alert = dbsession.query(Alert).filter_by(id=3).one()
# Test that inactive alerts are no processed
schedule_alert_query(report_type=ScheduleType.alert,
schedule_id=inactive_alert.id)