This is an automated email from the ASF dual-hosted git repository.

michaelsmolina pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 0d53446562acfaae6014c8f03af376072326e255
Author: Jack Fragassi <[email protected]>
AuthorDate: Tue Sep 19 09:40:13 2023 -0700

    fix: Improve the reliability of alerts & reports (#25239)
    
    (cherry picked from commit f672d5da5cb9390b83176bb12c27ce7eeea3e8ae)
---
 superset/config.py                       |   5 ++
 superset/tasks/cron_util.py              |  14 +--
 superset/tasks/scheduler.py              |   9 +-
 tests/unit_tests/tasks/test_cron_util.py | 145 ++++++++++++++++++-------------
 4 files changed, 106 insertions(+), 67 deletions(-)

diff --git a/superset/config.py b/superset/config.py
index 98718e731e..3847555a05 100644
--- a/superset/config.py
+++ b/superset/config.py
@@ -904,6 +904,10 @@ DASHBOARD_AUTO_REFRESH_INTERVALS = [
     [86400, "24 hours"],
 ]
 
+# This is used as a workaround for the alerts & reports scheduler task to get 
the time
+# celery beat triggered it, see https://github.com/celery/celery/issues/6974 
for details
+CELERY_BEAT_SCHEDULER_EXPIRES = timedelta(weeks=1)
+
 # Default celery config is to use SQLA as a broker, in a production setting
 # you'll want to use a proper broker as specified here:
 # http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html
@@ -932,6 +936,7 @@ class CeleryConfig:  # pylint: 
disable=too-few-public-methods
         "reports.scheduler": {
             "task": "reports.scheduler",
             "schedule": crontab(minute="*", hour="*"),
+            "options": {"expires": 
int(CELERY_BEAT_SCHEDULER_EXPIRES.total_seconds())},
         },
         "reports.prune_log": {
             "task": "reports.prune_log",
diff --git a/superset/tasks/cron_util.py b/superset/tasks/cron_util.py
index 19d342ebdc..329937fb82 100644
--- a/superset/tasks/cron_util.py
+++ b/superset/tasks/cron_util.py
@@ -17,7 +17,7 @@
 
 import logging
 from collections.abc import Iterator
-from datetime import datetime, timedelta, timezone as dt_timezone
+from datetime import datetime, timedelta
 
 from croniter import croniter
 from pytz import timezone as pytz_timezone, UnknownTimeZoneError
@@ -27,10 +27,10 @@ from superset import app
 logger = logging.getLogger(__name__)
 
 
-def cron_schedule_window(cron: str, timezone: str) -> Iterator[datetime]:
+def cron_schedule_window(
+    triggered_at: datetime, cron: str, timezone: str
+) -> Iterator[datetime]:
     window_size = app.config["ALERT_REPORTS_CRON_WINDOW_SIZE"]
-    # create a time-aware datetime in utc
-    time_now = datetime.now(tz=dt_timezone.utc)
     try:
         tz = pytz_timezone(timezone)
     except UnknownTimeZoneError:
@@ -39,9 +39,9 @@ def cron_schedule_window(cron: str, timezone: str) -> 
Iterator[datetime]:
         logger.warning("Timezone %s was invalid. Falling back to 'UTC'", 
timezone)
     utc = pytz_timezone("UTC")
     # convert the current time to the user's local time for comparison
-    time_now = time_now.astimezone(tz)
-    start_at = time_now - timedelta(seconds=1)
-    stop_at = time_now + timedelta(seconds=window_size)
+    time_now = triggered_at.astimezone(tz)
+    start_at = time_now - timedelta(seconds=window_size / 2)
+    stop_at = time_now + timedelta(seconds=window_size / 2)
     crons = croniter(cron, start_at)
     for schedule in crons.all_next(datetime):
         if schedule >= stop_at:
diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py
index a84036c340..9c541605c7 100644
--- a/superset/tasks/scheduler.py
+++ b/superset/tasks/scheduler.py
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import logging
+from datetime import datetime
 
 from celery import Celery
 from celery.exceptions import SoftTimeLimitExceeded
@@ -43,9 +44,15 @@ def scheduler() -> None:
         return
     with session_scope(nullpool=True) as session:
         active_schedules = ReportScheduleDAO.find_active(session)
+        triggered_at = (
+            datetime.fromisoformat(scheduler.request.expires)
+            - app.config["CELERY_BEAT_SCHEDULER_EXPIRES"]
+            if scheduler.request.expires
+            else datetime.utcnow()
+        )
         for active_schedule in active_schedules:
             for schedule in cron_schedule_window(
-                active_schedule.crontab, active_schedule.timezone
+                triggered_at, active_schedule.crontab, active_schedule.timezone
             ):
                 logger.info(
                     "Scheduling alert %s eta: %s", active_schedule.name, 
schedule
diff --git a/tests/unit_tests/tasks/test_cron_util.py 
b/tests/unit_tests/tasks/test_cron_util.py
index 5bc22273f5..56f1258e30 100644
--- a/tests/unit_tests/tasks/test_cron_util.py
+++ b/tests/unit_tests/tasks/test_cron_util.py
@@ -14,11 +14,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from datetime import datetime
 
 import pytest
-import pytz
-from dateutil import parser
-from freezegun import freeze_time
 from freezegun.api import FakeDatetime
 
 from superset.tasks.cron_util import cron_schedule_window
@@ -27,23 +25,28 @@ from superset.tasks.cron_util import cron_schedule_window
 @pytest.mark.parametrize(
     "current_dttm, cron, expected",
     [
-        ("2020-01-01T08:59:01Z", "0 1 * * *", []),
+        ("2020-01-01T08:59:01+00:00", "0 1 * * *", []),
         (
-            "2020-01-01T08:59:02Z",
+            "2020-01-01T08:59:32+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
         (
-            "2020-01-01T08:59:59Z",
+            "2020-01-01T08:59:59+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
         (
-            "2020-01-01T09:00:00Z",
+            "2020-01-01T09:00:00+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
-        ("2020-01-01T09:00:01Z", "0 1 * * *", []),
+        (
+            "2020-01-01T09:00:01+00:00",
+            "0 1 * * *",
+            [FakeDatetime(2020, 1, 1, 9, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
+        ),
+        ("2020-01-01T09:00:30+00:00", "0 1 * * *", []),
     ],
 )
 def test_cron_schedule_window_los_angeles(
@@ -53,34 +56,40 @@ def test_cron_schedule_window_los_angeles(
     Reports scheduler: Test cron schedule window for "America/Los_Angeles"
     """
 
-    with freeze_time(current_dttm):
-        datetimes = cron_schedule_window(cron, "America/Los_Angeles")
-        assert (
-            list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
-            == expected
-        )
+    datetimes = cron_schedule_window(
+        datetime.fromisoformat(current_dttm), cron, "America/Los_Angeles"
+    )
+    assert (
+        list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == 
expected
+    )
 
 
 @pytest.mark.parametrize(
     "current_dttm, cron, expected",
     [
-        ("2020-01-01T00:59:01Z", "0 1 * * *", []),
+        ("2020-01-01T00:59:01+00:00", "0 1 * * *", []),
+        ("2020-01-01T00:59:02+00:00", "0 1 * * *", []),
+        (
+            "2020-01-01T00:59:59+00:00",
+            "0 1 * * *",
+            [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
+        ),
         (
-            "2020-01-01T00:59:02Z",
+            "2020-01-01T01:00:00+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
         (
-            "2020-01-01T00:59:59Z",
+            "2020-01-01T01:00:01+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
         (
-            "2020-01-01T01:00:00Z",
+            "2020-01-01T01:00:29+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 1, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
-        ("2020-01-01T01:00:01Z", "0 1 * * *", []),
+        ("2020-01-01T01:00:30+00:00", "0 1 * * *", []),
     ],
 )
 def test_cron_schedule_window_invalid_timezone(
@@ -90,35 +99,41 @@ def test_cron_schedule_window_invalid_timezone(
     Reports scheduler: Test cron schedule window for "invalid timezone"
     """
 
-    with freeze_time(current_dttm):
-        datetimes = cron_schedule_window(cron, "invalid timezone")
-        # it should default to UTC
-        assert (
-            list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
-            == expected
-        )
+    datetimes = cron_schedule_window(
+        datetime.fromisoformat(current_dttm), cron, "invalid timezone"
+    )
+    # it should default to UTC
+    assert (
+        list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == 
expected
+    )
 
 
 @pytest.mark.parametrize(
     "current_dttm, cron, expected",
     [
-        ("2020-01-01T05:59:01Z", "0 1 * * *", []),
+        ("2020-01-01T05:59:01+00:00", "0 1 * * *", []),
+        ("2020-01-01T05:59:02+00:00", "0 1 * * *", []),
+        (
+            "2020-01-01T05:59:59+00:00",
+            "0 1 * * *",
+            [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
+        ),
         (
-            "2020-01-01T05:59:02Z",
+            "2020-01-01T06:00:00+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
         (
-            "2020-01-01T5:59:59Z",
+            "2020-01-01T06:00:01+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
         (
-            "2020-01-01T6:00:00",
+            "2020-01-01T06:00:29+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 6, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
-        ("2020-01-01T6:00:01Z", "0 1 * * *", []),
+        ("2020-01-01T06:00:30+00:00", "0 1 * * *", []),
     ],
 )
 def test_cron_schedule_window_new_york(
@@ -128,34 +143,40 @@ def test_cron_schedule_window_new_york(
     Reports scheduler: Test cron schedule window for "America/New_York"
     """
 
-    with freeze_time(current_dttm, tz_offset=0):
-        datetimes = cron_schedule_window(cron, "America/New_York")
-        assert (
-            list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
-            == expected
-        )
+    datetimes = cron_schedule_window(
+        datetime.fromisoformat(current_dttm), cron, "America/New_York"
+    )
+    assert (
+        list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == 
expected
+    )
 
 
 @pytest.mark.parametrize(
     "current_dttm, cron, expected",
     [
-        ("2020-01-01T06:59:01Z", "0 1 * * *", []),
+        ("2020-01-01T06:59:01+00:00", "0 1 * * *", []),
+        ("2020-01-01T06:59:02+00:00", "0 1 * * *", []),
+        (
+            "2020-01-01T06:59:59+00:00",
+            "0 1 * * *",
+            [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
+        ),
         (
-            "2020-01-01T06:59:02Z",
+            "2020-01-01T07:00:00+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
         (
-            "2020-01-01T06:59:59Z",
+            "2020-01-01T07:00:01+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
         (
-            "2020-01-01T07:00:00",
+            "2020-01-01T07:00:29+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 1, 1, 7, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
-        ("2020-01-01T07:00:01Z", "0 1 * * *", []),
+        ("2020-01-01T07:00:30+00:00", "0 1 * * *", []),
     ],
 )
 def test_cron_schedule_window_chicago(
@@ -165,34 +186,40 @@ def test_cron_schedule_window_chicago(
     Reports scheduler: Test cron schedule window for "America/Chicago"
     """
 
-    with freeze_time(current_dttm, tz_offset=0):
-        datetimes = cron_schedule_window(cron, "America/Chicago")
-        assert (
-            list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
-            == expected
-        )
+    datetimes = cron_schedule_window(
+        datetime.fromisoformat(current_dttm), cron, "America/Chicago"
+    )
+    assert (
+        list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == 
expected
+    )
 
 
 @pytest.mark.parametrize(
     "current_dttm, cron, expected",
     [
-        ("2020-07-01T05:59:01Z", "0 1 * * *", []),
+        ("2020-07-01T05:59:01+00:00", "0 1 * * *", []),
+        ("2020-07-01T05:59:02+00:00", "0 1 * * *", []),
+        (
+            "2020-07-01T05:59:59+00:00",
+            "0 1 * * *",
+            [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
+        ),
         (
-            "2020-07-01T05:59:02Z",
+            "2020-07-01T06:00:00+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
         (
-            "2020-07-01T05:59:59Z",
+            "2020-07-01T06:00:01+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
         (
-            "2020-07-01T06:00:00",
+            "2020-07-01T06:00:29+00:00",
             "0 1 * * *",
             [FakeDatetime(2020, 7, 1, 6, 0).strftime("%A, %d %B %Y, 
%H:%M:%S")],
         ),
-        ("2020-07-01T06:00:01Z", "0 1 * * *", []),
+        ("2020-07-01T06:00:30+00:00", "0 1 * * *", []),
     ],
 )
 def test_cron_schedule_window_chicago_daylight(
@@ -202,9 +229,9 @@ def test_cron_schedule_window_chicago_daylight(
     Reports scheduler: Test cron schedule window for "America/Chicago"
     """
 
-    with freeze_time(current_dttm, tz_offset=0):
-        datetimes = cron_schedule_window(cron, "America/Chicago")
-        assert (
-            list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes)
-            == expected
-        )
+    datetimes = cron_schedule_window(
+        datetime.fromisoformat(current_dttm), cron, "America/Chicago"
+    )
+    assert (
+        list(cron.strftime("%A, %d %B %Y, %H:%M:%S") for cron in datetimes) == 
expected
+    )

Reply via email to