This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c4549d721f Account for change in UTC offset when calculating next 
schedule (#35887)
c4549d721f is described below

commit c4549d721f3b8127e023f1953378484f02e5919b
Author: Tzu-ping Chung <uranu...@gmail.com>
AuthorDate: Wed Dec 6 15:40:34 2023 +0800

    Account for change in UTC offset when calculating next schedule (#35887)
    
    Co-authored-by: Daniel Standish 
<15932138+dstand...@users.noreply.github.com>
---
 airflow/models/dag.py                       |  25 ++-
 airflow/timetables/_cron.py                 |  44 ++--
 tests/timetables/test_interval_timetable.py | 305 ++++++++++++++++++++++++++++
 3 files changed, 351 insertions(+), 23 deletions(-)

diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index e93d5a55a0..654bc85cae 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -848,16 +848,35 @@ class DAG(LoggingMixin):
         return [info.logical_date for info in it]
 
     def is_fixed_time_schedule(self):
+        """Figures out if the schedule has a fixed time (e.g. 3 AM every day).
+
+        Detection is done by "peeking" the next two cron trigger time; if the
+        two times have the same minute and hour value, the schedule is fixed,
+        and we *don't* need to perform the DST fix.
+
+        This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
+
+        Do not try to understand what this actually means. It is old logic that
+        should not be used anywhere.
+        """
         warnings.warn(
             "`DAG.is_fixed_time_schedule()` is deprecated.",
             category=RemovedInAirflow3Warning,
             stacklevel=2,
         )
-        try:
-            return not self.timetable._should_fix_dst
-        except AttributeError:
+
+        from airflow.timetables._cron import CronMixin
+
+        if not isinstance(self.timetable, CronMixin):
             return True
 
+        from croniter import croniter
+
+        cron = croniter(self.timetable._expression)
+        next_a = cron.get_next(datetime.datetime)
+        next_b = cron.get_next(datetime.datetime)
+        return next_b.minute == next_a.minute and next_b.hour == next_a.hour
+
     def following_schedule(self, dttm):
         """
         Calculate the following schedule for this dag in UTC.
diff --git a/airflow/timetables/_cron.py b/airflow/timetables/_cron.py
index f9b8efa465..9f5878b197 100644
--- a/airflow/timetables/_cron.py
+++ b/airflow/timetables/_cron.py
@@ -17,7 +17,6 @@
 from __future__ import annotations
 
 import datetime
-from functools import cached_property
 from typing import TYPE_CHECKING, Any
 
 from cron_descriptor import CasingTypeEnum, ExpressionDescriptor, 
FormatException, MissingFieldException
@@ -32,21 +31,32 @@ if TYPE_CHECKING:
     from pendulum import DateTime
 
 
-def _is_schedule_fixed(expression: str) -> bool:
-    """Figures out if the schedule has a fixed time (e.g. 3 AM every day).
+def _covers_every_hour(cron: croniter) -> bool:
+    """Check whether the given cron runs at least once an hour.
 
-    :return: True if the schedule has a fixed time, False if not.
+    This indicates whether we need to implement a workaround for (what I call)
+    the "fold hour problem". Folding happens when a region switches time
+    backwards, usually as a part of ending a DST period, causing a block of 
time
+    to occur twice in the wall clock. This is indicated by the ``fold`` flag on
+    datetime.
 
-    Detection is done by "peeking" the next two cron trigger time; if the
-    two times have the same minute and hour value, the schedule is fixed,
-    and we *don't* need to perform the DST fix.
+    As an example, Switzerland in 2023 ended DST on 3am (wall clock time, 
UTC+2)
+    by dialing back the clock to 2am (UTC+1). So for (say) ``30 * * * *``, if
+    the last run was 2:30am (UTC+2), the next needs to be 2:30am (UTC+1, 
folded)
+    instead of 3:30am.
 
-    This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
+    While this technically happens for all runs (in such a timezone), we only
+    really care about runs that happen at least once an hour, and can
+    provide a somewhat reasonable rationale to skip the fold hour for things
+    such as ``*/2`` (every two hour). So we try to *minially* peak into 
croniter
+    internals to work around the issue.
+
+    The check is simple since croniter internally normalizes things to ``*``.
+    More edge cases can be added later as needed.
+
+    See also: https://github.com/kiorky/croniter/issues/56.
     """
-    cron = croniter(expression)
-    next_a = cron.get_next(datetime.datetime)
-    next_b = cron.get_next(datetime.datetime)
-    return next_b.minute == next_a.minute and next_b.hour == next_a.hour
+    return cron.expanded[1] == ["*"]
 
 
 class CronMixin:
@@ -91,18 +101,12 @@ class CronMixin:
         except (CroniterBadCronError, CroniterBadDateError) as e:
             raise AirflowTimetableInvalid(str(e))
 
-    @cached_property
-    def _should_fix_dst(self) -> bool:
-        # This is lazy so instantiating a schedule does not immediately raise
-        # an exception. Validity is checked with validate() during DAG-bagging.
-        return not _is_schedule_fixed(self._expression)
-
     def _get_next(self, current: DateTime) -> DateTime:
         """Get the first schedule after specified time, with DST fixed."""
         naive = make_naive(current, self._timezone)
         cron = croniter(self._expression, start_time=naive)
         scheduled = cron.get_next(datetime.datetime)
-        if not self._should_fix_dst:
+        if not _covers_every_hour(cron):
             return convert_to_utc(make_aware(scheduled, self._timezone))
         delta = scheduled - naive
         return convert_to_utc(current.in_timezone(self._timezone) + delta)
@@ -112,7 +116,7 @@ class CronMixin:
         naive = make_naive(current, self._timezone)
         cron = croniter(self._expression, start_time=naive)
         scheduled = cron.get_prev(datetime.datetime)
-        if not self._should_fix_dst:
+        if not _covers_every_hour(cron):
             return convert_to_utc(make_aware(scheduled, self._timezone))
         delta = naive - scheduled
         return convert_to_utc(current.in_timezone(self._timezone) - delta)
diff --git a/tests/timetables/test_interval_timetable.py 
b/tests/timetables/test_interval_timetable.py
index 596c274cf7..928ae83ab3 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -253,3 +253,308 @@ def 
test_cron_next_dagrun_info_alignment(last_data_interval: DataInterval, expec
         restriction=TimeRestriction(None, None, True),
     )
     assert info == expected_info
+
+
+class TestCronIntervalDst:
+    """Test cron interval timetable can correctly enter a DST boundary.
+
+    Zurich (Switzerland) is chosen since it is +1/+2 DST, making it a bit 
easier
+    to get my head around the mental timezone conversion.
+
+    In 2023, DST entered on 26th Mar, 2am local clocks (1am UTC) were turned
+    forward to 3am. DST exited on 29th Oct, 3am local clocks (1am UTC) were
+    turned backward to 2am (making the 2:XX hour fold).
+    """
+
+    def test_entering_exact(self) -> None:
+        timetable = CronDataIntervalTimetable("0 3 * * *", 
timezone="Europe/Zurich")
+        restriction = TimeRestriction(
+            earliest=pendulum.datetime(2023, 3, 24, tz=TIMEZONE),
+            latest=None,
+            catchup=True,
+        )
+
+        # Last run before DST. Interval starts and ends on 2am UTC (local time 
is +1).
+        next_info = 
timetable.next_dagrun_info(last_automated_data_interval=None, 
restriction=restriction)
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 3, 24, 2, tz=TIMEZONE),
+            pendulum.datetime(2023, 3, 25, 2, tz=TIMEZONE),
+        )
+
+        # Crossing the DST switch. Interval starts on 2am UTC (local time +1)
+        # but ends on 1am UTC (local time is +2).
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 3, 25, 2, tz=TIMEZONE),
+            pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE),
+        )
+
+        # In DST. Interval starts and ends on 1am UTC (local time is +2).
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE),
+            pendulum.datetime(2023, 3, 27, 1, tz=TIMEZONE),
+        )
+
+    def test_entering_skip(self) -> None:
+        timetable = CronDataIntervalTimetable("0 2 * * *", 
timezone="Europe/Zurich")
+        restriction = TimeRestriction(
+            earliest=pendulum.datetime(2023, 3, 24, tz=TIMEZONE),
+            latest=None,
+            catchup=True,
+        )
+
+        # Last run before DST. Interval starts and ends on 1am UTC (local time 
is +1).
+        next_info = 
timetable.next_dagrun_info(last_automated_data_interval=None, 
restriction=restriction)
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 3, 24, 1, tz=TIMEZONE),
+            pendulum.datetime(2023, 3, 25, 1, tz=TIMEZONE),
+        )
+
+        # Crossing the DST switch. Interval starts on 1am UTC (local time +1)
+        # and ends on 1am UTC (local time is +2) since the 2am wall clock time
+        # does not logically exist due to entering DST.
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 3, 25, 1, tz=TIMEZONE),
+            pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE),
+        )
+
+        # In DST. Interval starts on 1am UTC (local time is +2 but 2am local
+        # time is not possible) and ends on 0am UTC.
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE),
+            pendulum.datetime(2023, 3, 27, 0, tz=TIMEZONE),
+        )
+
+    def test_exiting_exact(self) -> None:
+        timetable = CronDataIntervalTimetable("0 3 * * *", 
timezone="Europe/Zurich")
+        restriction = TimeRestriction(
+            earliest=pendulum.datetime(2023, 10, 27, tz=TIMEZONE),
+            latest=None,
+            catchup=True,
+        )
+
+        # Last run in DST. Interval starts and ends on 1am UTC (local time is 
+2).
+        next_info = 
timetable.next_dagrun_info(last_automated_data_interval=None, 
restriction=restriction)
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 10, 27, 1, tz=TIMEZONE),
+            pendulum.datetime(2023, 10, 28, 1, tz=TIMEZONE),
+        )
+
+        # Crossing the DST switch. Interval starts on 1am UTC (local time +2)
+        # and ends on 2am UTC (local time +1).
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 10, 28, 1, tz=TIMEZONE),
+            pendulum.datetime(2023, 10, 29, 2, tz=TIMEZONE),
+        )
+
+        # Out of DST. Interval starts and ends on 2am UTC (local time is +1).
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 10, 29, 2, tz=TIMEZONE),
+            pendulum.datetime(2023, 10, 30, 2, tz=TIMEZONE),
+        )
+
+    def test_exiting_fold(self) -> None:
+        timetable = CronDataIntervalTimetable("0 2 * * *", 
timezone="Europe/Zurich")
+        restriction = TimeRestriction(
+            earliest=pendulum.datetime(2023, 10, 27, tz=TIMEZONE),
+            latest=None,
+            catchup=True,
+        )
+
+        # Last run before folding. Interval starts and ends on 0am UTC (local
+        # time is +2).
+        next_info = 
timetable.next_dagrun_info(last_automated_data_interval=None, 
restriction=restriction)
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 10, 27, 0, tz=TIMEZONE),
+            pendulum.datetime(2023, 10, 28, 0, tz=TIMEZONE),
+        )
+
+        # Account for folding. Interval starts on 0am UTC (local time +2) and
+        # ends on 1am UTC (local time +1). There are two 2am local times on the
+        # 29th due to folding. We end on the second one (fold=1. There's no
+        # logical reason here; this is simply what Airflow has been doing since
+        # a long time ago, and there's no point breaking it.
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 10, 28, 0, tz=TIMEZONE),
+            pendulum.datetime(2023, 10, 29, 1, tz=TIMEZONE),
+        )
+
+        # Stepping out of DST. Interval starts from the folded 2am local time
+        # (1am UTC out of DST) since that is when the previous interval ended.
+        # It ends at 1am UTC (local time is +1) normally.
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2023, 10, 29, 1, tz=TIMEZONE),
+            pendulum.datetime(2023, 10, 30, 1, tz=TIMEZONE),
+        )
+
+
+class TestCronIntervalDstNonTrivial:
+    """These tests are similar to TestCronIntervalDst but with a different 
cron.
+
+    The original test cases are from apache/airflow#7999. In 2020 at Los 
Angeles,
+    DST started on 8th Mar; 10am UTC was turned from 2am UTC-8 to 3am UTC-7.
+    """
+
+    def test_7_to_8_entering(self):
+        timetable = CronDataIntervalTimetable("0 7-8 * * *", 
timezone="America/Los_Angeles")
+        restriction = TimeRestriction(
+            earliest=pendulum.datetime(2020, 3, 7, tz=TIMEZONE),
+            latest=None,
+            catchup=True,
+        )
+
+        # Triggers as expected before the interval touches the DST transition.
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=None,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2020, 3, 7, 7 + 8, tz=TIMEZONE),
+            pendulum.datetime(2020, 3, 7, 8 + 8, tz=TIMEZONE),
+        )
+
+        # This interval ends an hour early since it includes the DST switch!
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2020, 3, 7, 8 + 8, tz=TIMEZONE),
+            pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE),
+        )
+
+        # We're fully into DST so the interval is as expected.
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE),
+            pendulum.datetime(2020, 3, 8, 8 + 7, tz=TIMEZONE),
+        )
+
+    def test_7_and_9_entering(self):
+        timetable = CronDataIntervalTimetable("0 7,9 * * *", 
timezone="America/Los_Angeles")
+        restriction = TimeRestriction(
+            earliest=pendulum.datetime(2020, 3, 7, tz=TIMEZONE),
+            latest=None,
+            catchup=True,
+        )
+
+        # Triggers as expected before the interval touches the DST transition.
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=None,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2020, 3, 7, 7 + 8, tz=TIMEZONE),
+            pendulum.datetime(2020, 3, 7, 9 + 8, tz=TIMEZONE),
+        )
+
+        # This interval ends an hour early since it includes the DST switch!
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2020, 3, 7, 9 + 8, tz=TIMEZONE),
+            pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE),
+        )
+
+        # We're fully into DST so the interval is as expected.
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=next_info.data_interval,
+            restriction=restriction,
+        )
+        assert next_info and next_info.data_interval == DataInterval(
+            pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE),
+            pendulum.datetime(2020, 3, 8, 9 + 7, tz=TIMEZONE),
+        )
+
+
+def test_fold_scheduling():
+    timetable = CronDataIntervalTimetable("*/30 * * * *", 
timezone="Europe/Zurich")
+    restriction = TimeRestriction(
+        earliest=pendulum.datetime(2023, 10, 28, 23, 30, tz=TIMEZONE),  # 
Locally 1:30 (DST).
+        latest=None,
+        catchup=True,
+    )
+
+    # Still in DST, acting normally.
+    next_info = timetable.next_dagrun_info(
+        last_automated_data_interval=None,
+        restriction=restriction,
+    )
+    assert next_info and next_info.data_interval == DataInterval(
+        pendulum.datetime(2023, 10, 28, 23, 30, tz=TIMEZONE),
+        pendulum.datetime(2023, 10, 29, 0, 0, tz=TIMEZONE),  # Locally 2am 
(DST).
+    )
+    next_info = timetable.next_dagrun_info(
+        last_automated_data_interval=next_info.data_interval,
+        restriction=restriction,
+    )
+    assert next_info and next_info.data_interval == DataInterval(
+        pendulum.datetime(2023, 10, 29, 0, 0, tz=TIMEZONE),
+        pendulum.datetime(2023, 10, 29, 0, 30, tz=TIMEZONE),  # Locally 2:30 
(DST).
+    )
+
+    # Crossing into fold.
+    next_info = timetable.next_dagrun_info(
+        last_automated_data_interval=next_info.data_interval,
+        restriction=restriction,
+    )
+    assert next_info and next_info.data_interval == DataInterval(
+        pendulum.datetime(2023, 10, 29, 0, 30, tz=TIMEZONE),
+        pendulum.datetime(2023, 10, 29, 1, 0, tz=TIMEZONE),  # Locally 2am 
(fold, not DST).
+    )
+
+    # In the "fold zone".
+    next_info = timetable.next_dagrun_info(
+        last_automated_data_interval=next_info.data_interval,
+        restriction=restriction,
+    )
+    assert next_info and next_info.data_interval == DataInterval(
+        pendulum.datetime(2023, 10, 29, 1, 0, tz=TIMEZONE),
+        pendulum.datetime(2023, 10, 29, 1, 30, tz=TIMEZONE),  # Locally 2am 
(fold, not DST).
+    )
+
+    # Stepping out of fold.
+    next_info = timetable.next_dagrun_info(
+        last_automated_data_interval=next_info.data_interval,
+        restriction=restriction,
+    )
+    assert next_info and next_info.data_interval == DataInterval(
+        pendulum.datetime(2023, 10, 29, 1, 30, tz=TIMEZONE),
+        pendulum.datetime(2023, 10, 29, 2, 0, tz=TIMEZONE),  # Locally 3am 
(not DST).
+    )

Reply via email to