This is an automated email from the ASF dual-hosted git repository.
elizabeth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new f672d5da5c fix: Improve the reliability of alerts & reports (#25239)
f672d5da5c is described below
commit f672d5da5cb9390b83176bb12c27ce7eeea3e8ae
Author: Jack Fragassi <[email protected]>
AuthorDate: Tue Sep 19 09:40:13 2023 -0700
fix: Improve the reliability of alerts & reports (#25239)
---
superset/config.py | 5 ++
superset/tasks/cron_util.py | 14 +--
superset/tasks/scheduler.py | 9 +-
tests/unit_tests/tasks/test_cron_util.py | 145 ++++++++++++++++++-------------
4 files changed, 106 insertions(+), 67 deletions(-)
diff --git a/superset/config.py b/superset/config.py
index 18a6157578..1145a7693f 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -914,6 +914,10 @@ DASHBOARD_AUTO_REFRESH_INTERVALS = [
[86400, "24 hours"],
]
+# This is used as a workaround for the alerts & reports scheduler task to get
the time
+# celery beat triggered it, see https://github.com/celery/celery/issues/6974
for details
+CELERY_BEAT_SCHEDULER_EXPIRES = timedelta(weeks=1)
+
# Default celery config is to use SQLA as a broker, in a production setting
# you'll want to use a proper broker as specified here:
#
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html
@@ -942,6 +946,7 @@ class CeleryConfig: # pylint:
disable=too-few-public-methods
"reports.scheduler": {
"task": "reports.scheduler",
"schedule": crontab(minute="*", hour="*"),
+ "options": {"expires":
int(CELERY_BEAT_SCHEDULER_EXPIRES.total_seconds())},
},
"reports.prune_log": {
"task": "reports.prune_log",
diff --git a/superset/tasks/cron_util.py b/superset/tasks/cron_util.py
index 19d342ebdc..329937fb82 100644
--- a/superset/tasks/cron_util.py
+++ b/superset/tasks/cron_util.py
@@ -17,7 +17,7 @@
import logging
from collections.abc import Iterator
-from datetime import datetime, timedelta, timezone as dt_timezone
+from datetime import datetime, timedelta
from croniter import croniter
from pytz import timezone as pytz_timezone, UnknownTimeZoneError
@@ -27,10 +27,10 @@ from superset import app
logger = logging.getLogger(__name__)
-def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
+def cron_schedule_window(
+ triggered_at: datetime, cron: str, timezone: str
+) -> Iterator[datetime]:
window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"]
- # create a time-aware datetime in utc
- time_now = datetime.now(tz=dt_timezone.utc)
try:
tz = pytz_timezone(timezone)
except UnknownTimeZoneError:
@@ -39,9 +39,9 @@ def cron_schedule_window(cron: str, timezone: str) ->
Iterator[datetime]:
logger.warning("Timezone %s was invalid. Falling back to 'UTC'",
timezone)
utc = pytz_timezone("UTC")
# convert the current time to the user's local time for comparison
- time_now = time_now.astimezone(tz)
- start_at = time_now - timedelta(seconds=1)
- stop_at = time_now + timedelta(seconds=window_size)
+ time_now = triggered_at.astimezone(tz)
+ start_at = time_now - timedelta(seconds=window_size / 2)
+ stop_at = time_now + timedelta(seconds=window_size / 2)
crons = croniter(cron, start_at)
for schedule in crons.all_next(datetime):
if schedule >= stop_at:
diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py
index 90df90ff15..f3cc270b86 100644
--- a/superset/tasks/scheduler.py
+++ b/superset/tasks/scheduler.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import logging
+from datetime import datetime
from celery import Celery
from celery.exceptions import SoftTimeLimitExceeded
@@ -47,9 +48,15 @@ def scheduler() -> None:
return
with session_scope(nullpool=True) as session:
active_schedules = ReportScheduleDAO.find_active(session)
+ triggered_at = (
+ datetime.fromisoformat(scheduler.request.expires)
+ - app.config["CELERY_BEAT_SCHEDULER_EXPIRES"]
+ if scheduler.request.expires
+ else datetime.utcnow()
+ )
for active_schedule in active_schedules:
for schedule in cron_schedule_window(
- active_schedule.crontab, active_schedule.timezone
+ triggered_at, active_schedule.crontab, active_schedule.timezone
):
logger.info(
"Scheduling alert %s eta: %s", active_schedule.name,
schedule
diff --git a/tests/unit_tests/tasks/test_cron_util.py
b/tests/unit_tests/tasks/test_cron_util.py
index 5bc22273f5..56f1258e30 100644
--- a/tests/unit_tests/tasks/test_cron_util.py
+++ b/tests/unit_tests/tasks/test_cron_util.py
@@ -14,11 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from datetime import datetime
import pytest
-import pytz
-from dateutil import parser
-from freezegun import freeze_time
from freezegun.api import FakeDatetime
from superset.tasks.cron_util import cron_schedule_window
@@ -27,23 +25,28 @@ from superset.tasks.cron_util import cron_schedule_window
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
- ("2020-01-01T08:59:01Z", "0 1 * * *", []),
+ ("2020-01-01T08:59:01+00:00", "0 1 * * *", []),
(
- "2020-01-01T08:59:02Z",
+ "2020-01-01T08:59:32+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
(
- "2020-01-01T08:59:59Z",
+ "2020-01-01T08:59:59+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
(
- "2020-01-01T09:00:00Z",
+ "2020-01-01T09:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
- ("2020-01-01T09:00:01Z", "0 1 * * *", []),
+ (
+ "2020-01-01T09:00:01+00:00",
+ "0 1 * * *",
+ [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
+ ),
+ ("2020-01-01T09:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_los_angeles(
@@ -53,34 +56,40 @@ def test_cron_schedule_window_los_angeles(
Reports scheduler: Test cron schedule window for "America/Los_Angeles"
"""
- with freeze_time(current_dttm):
- datetimes = cron_schedule_window(cron, "America/Los_Angeles")
- assert (
- list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
- == expected
- )
+ datetimes = cron_schedule_window(
+ datetime.fromisoformat(current_dttm), cron, "America/Los_Angeles"
+ )
+ assert (
+ list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) ==
expected
+ )
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
- ("2020-01-01T00:59:01Z", "0 1 * * *", []),
+ ("2020-01-01T00:59:01+00:00", "0 1 * * *", []),
+ ("2020-01-01T00:59:02+00:00", "0 1 * * *", []),
+ (
+ "2020-01-01T00:59:59+00:00",
+ "0 1 * * *",
+ [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
+ ),
(
- "2020-01-01T00:59:02Z",
+ "2020-01-01T01:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
(
- "2020-01-01T00:59:59Z",
+ "2020-01-01T01:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
(
- "2020-01-01T01:00:00Z",
+ "2020-01-01T01:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
- ("2020-01-01T01:00:01Z", "0 1 * * *", []),
+ ("2020-01-01T01:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_invalid_timezone(
@@ -90,35 +99,41 @@ def test_cron_schedule_window_invalid_timezone(
Reports scheduler: Test cron schedule window for "invalid timezone"
"""
- with freeze_time(current_dttm):
- datetimes = cron_schedule_window(cron, "invalid timezone")
- # it should default to UTC
- assert (
- list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
- == expected
- )
+ datetimes = cron_schedule_window(
+ datetime.fromisoformat(current_dttm), cron, "invalid timezone"
+ )
+ # it should default to UTC
+ assert (
+ list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) ==
expected
+ )
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
- ("2020-01-01T05:59:01Z", "0 1 * * *", []),
+ ("2020-01-01T05:59:01+00:00", "0 1 * * *", []),
+ ("2020-01-01T05:59:02+00:00", "0 1 * * *", []),
+ (
+ "2020-01-01T05:59:59+00:00",
+ "0 1 * * *",
+ [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
+ ),
(
- "2020-01-01T05:59:02Z",
+ "2020-01-01T06:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
(
- "2020-01-01T5:59:59Z",
+ "2020-01-01T06:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
(
- "2020-01-01T6:00:00",
+ "2020-01-01T06:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
- ("2020-01-01T6:00:01Z", "0 1 * * *", []),
+ ("2020-01-01T06:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_new_york(
@@ -128,34 +143,40 @@ def test_cron_schedule_window_new_york(
Reports scheduler: Test cron schedule window for "America/New_York"
"""
- with freeze_time(current_dttm, tz_offset=0):
- datetimes = cron_schedule_window(cron, "America/New_York")
- assert (
- list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
- == expected
- )
+ datetimes = cron_schedule_window(
+ datetime.fromisoformat(current_dttm), cron, "America/New_York"
+ )
+ assert (
+ list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) ==
expected
+ )
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
- ("2020-01-01T06:59:01Z", "0 1 * * *", []),
+ ("2020-01-01T06:59:01+00:00", "0 1 * * *", []),
+ ("2020-01-01T06:59:02+00:00", "0 1 * * *", []),
+ (
+ "2020-01-01T06:59:59+00:00",
+ "0 1 * * *",
+ [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
+ ),
(
- "2020-01-01T06:59:02Z",
+ "2020-01-01T07:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
(
- "2020-01-01T06:59:59Z",
+ "2020-01-01T07:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
(
- "2020-01-01T07:00:00",
+ "2020-01-01T07:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
- ("2020-01-01T07:00:01Z", "0 1 * * *", []),
+ ("2020-01-01T07:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_chicago(
@@ -165,34 +186,40 @@ def test_cron_schedule_window_chicago(
Reports scheduler: Test cron schedule window for "America/Chicago"
"""
- with freeze_time(current_dttm, tz_offset=0):
- datetimes = cron_schedule_window(cron, "America/Chicago")
- assert (
- list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
- == expected
- )
+ datetimes = cron_schedule_window(
+ datetime.fromisoformat(current_dttm), cron, "America/Chicago"
+ )
+ assert (
+ list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) ==
expected
+ )
@pytest.mark.parametrize(
"current_dttm, cron, expected",
[
- ("2020-07-01T05:59:01Z", "0 1 * * *", []),
+ ("2020-07-01T05:59:01+00:00", "0 1 * * *", []),
+ ("2020-07-01T05:59:02+00:00", "0 1 * * *", []),
+ (
+ "2020-07-01T05:59:59+00:00",
+ "0 1 * * *",
+ [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
+ ),
(
- "2020-07-01T05:59:02Z",
+ "2020-07-01T06:00:00+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
(
- "2020-07-01T05:59:59Z",
+ "2020-07-01T06:00:01+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
(
- "2020-07-01T06:00:00",
+ "2020-07-01T06:00:29+00:00",
"0 1 * * *",
[FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y,
%H:%M:%S")],
),
- ("2020-07-01T06:00:01Z", "0 1 * * *", []),
+ ("2020-07-01T06:00:30+00:00", "0 1 * * *", []),
],
)
def test_cron_schedule_window_chicago_daylight(
@@ -202,9 +229,9 @@ def test_cron_schedule_window_chicago_daylight(
Reports scheduler: Test cron schedule window for "America/Chicago"
"""
- with freeze_time(current_dttm, tz_offset=0):
- datetimes = cron_schedule_window(cron, "America/Chicago")
- assert (
- list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
- == expected
- )
+ datetimes = cron_schedule_window(
+ datetime.fromisoformat(current_dttm), cron, "America/Chicago"
+ )
+ assert (
+ list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) ==
expected
+ )