This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c4549d721f Account for change in UTC offset when calculating next
schedule (#35887)
c4549d721f is described below
commit c4549d721f3b8127e023f1953378484f02e5919b
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Dec 6 15:40:34 2023 +0800
Account for change in UTC offset when calculating next schedule (#35887)
Co-authored-by: Daniel Standish
<[email protected]>
---
airflow/models/dag.py | 25 ++-
airflow/timetables/_cron.py | 44 ++--
tests/timetables/test_interval_timetable.py | 305 ++++++++++++++++++++++++++++
3 files changed, 351 insertions(+), 23 deletions(-)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index e93d5a55a0..654bc85cae 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -848,16 +848,35 @@ class DAG(LoggingMixin):
return [info.logical_date for info in it]
def is_fixed_time_schedule(self):
+ """Figures out if the schedule has a fixed time (e.g. 3 AM every day).
+
+ Detection is done by "peeking" the next two cron trigger time; if the
+ two times have the same minute and hour value, the schedule is fixed,
+ and we *don't* need to perform the DST fix.
+
+ This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
+
+ Do not try to understand what this actually means. It is old logic that
+ should not be used anywhere.
+ """
warnings.warn(
"`DAG.is_fixed_time_schedule()` is deprecated.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
- try:
- return not self.timetable._should_fix_dst
- except AttributeError:
+
+ from airflow.timetables._cron import CronMixin
+
+ if not isinstance(self.timetable, CronMixin):
return True
+ from croniter import croniter
+
+ cron = croniter(self.timetable._expression)
+ next_a = cron.get_next(datetime.datetime)
+ next_b = cron.get_next(datetime.datetime)
+ return next_b.minute == next_a.minute and next_b.hour == next_a.hour
+
def following_schedule(self, dttm):
"""
Calculate the following schedule for this dag in UTC.
diff --git a/airflow/timetables/_cron.py b/airflow/timetables/_cron.py
index f9b8efa465..9f5878b197 100644
--- a/airflow/timetables/_cron.py
+++ b/airflow/timetables/_cron.py
@@ -17,7 +17,6 @@
from __future__ import annotations
import datetime
-from functools import cached_property
from typing import TYPE_CHECKING, Any
from cron_descriptor import CasingTypeEnum, ExpressionDescriptor,
FormatException, MissingFieldException
@@ -32,21 +31,32 @@ if TYPE_CHECKING:
from pendulum import DateTime
-def _is_schedule_fixed(expression: str) -> bool:
- """Figures out if the schedule has a fixed time (e.g. 3 AM every day).
+def _covers_every_hour(cron: croniter) -> bool:
+ """Check whether the given cron runs at least once an hour.
- :return: True if the schedule has a fixed time, False if not.
+ This indicates whether we need to implement a workaround for (what I call)
+ the "fold hour problem". Folding happens when a region switches time
+ backwards, usually as a part of ending a DST period, causing a block of
time
+ to occur twice in the wall clock. This is indicated by the ``fold`` flag on
+ datetime.
- Detection is done by "peeking" the next two cron trigger time; if the
- two times have the same minute and hour value, the schedule is fixed,
- and we *don't* need to perform the DST fix.
+ As an example, Switzerland in 2023 ended DST on 3am (wall clock time,
UTC+2)
+ by dialing back the clock to 2am (UTC+1). So for (say) ``30 * * * *``, if
+ the last run was 2:30am (UTC+2), the next needs to be 2:30am (UTC+1,
folded)
+ instead of 3:30am.
- This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
+ While this technically happens for all runs (in such a timezone), we only
+ really care about runs that happen at least once an hour, and can
+ provide a somewhat reasonable rationale to skip the fold hour for things
+ such as ``*/2`` (every two hour). So we try to *minially* peak into
croniter
+ internals to work around the issue.
+
+ The check is simple since croniter internally normalizes things to ``*``.
+ More edge cases can be added later as needed.
+
+ See also: https://github.com/kiorky/croniter/issues/56.
"""
- cron = croniter(expression)
- next_a = cron.get_next(datetime.datetime)
- next_b = cron.get_next(datetime.datetime)
- return next_b.minute == next_a.minute and next_b.hour == next_a.hour
+ return cron.expanded[1] == ["*"]
class CronMixin:
@@ -91,18 +101,12 @@ class CronMixin:
except (CroniterBadCronError, CroniterBadDateError) as e:
raise AirflowTimetableInvalid(str(e))
- @cached_property
- def _should_fix_dst(self) -> bool:
- # This is lazy so instantiating a schedule does not immediately raise
- # an exception. Validity is checked with validate() during DAG-bagging.
- return not _is_schedule_fixed(self._expression)
-
def _get_next(self, current: DateTime) -> DateTime:
"""Get the first schedule after specified time, with DST fixed."""
naive = make_naive(current, self._timezone)
cron = croniter(self._expression, start_time=naive)
scheduled = cron.get_next(datetime.datetime)
- if not self._should_fix_dst:
+ if not _covers_every_hour(cron):
return convert_to_utc(make_aware(scheduled, self._timezone))
delta = scheduled - naive
return convert_to_utc(current.in_timezone(self._timezone) + delta)
@@ -112,7 +116,7 @@ class CronMixin:
naive = make_naive(current, self._timezone)
cron = croniter(self._expression, start_time=naive)
scheduled = cron.get_prev(datetime.datetime)
- if not self._should_fix_dst:
+ if not _covers_every_hour(cron):
return convert_to_utc(make_aware(scheduled, self._timezone))
delta = naive - scheduled
return convert_to_utc(current.in_timezone(self._timezone) - delta)
diff --git a/tests/timetables/test_interval_timetable.py
b/tests/timetables/test_interval_timetable.py
index 596c274cf7..928ae83ab3 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -253,3 +253,308 @@ def
test_cron_next_dagrun_info_alignment(last_data_interval: DataInterval, expec
restriction=TimeRestriction(None, None, True),
)
assert info == expected_info
+
+
+class TestCronIntervalDst:
+ """Test cron interval timetable can correctly enter a DST boundary.
+
+ Zurich (Switzerland) is chosen since it is +1/+2 DST, making it a bit
easier
+ to get my head around the mental timezone conversion.
+
+ In 2023, DST entered on 26th Mar, 2am local clocks (1am UTC) were turned
+ forward to 3am. DST exited on 29th Oct, 3am local clocks (1am UTC) were
+ turned backward to 2am (making the 2:XX hour fold).
+ """
+
+ def test_entering_exact(self) -> None:
+ timetable = CronDataIntervalTimetable("0 3 * * *",
timezone="Europe/Zurich")
+ restriction = TimeRestriction(
+ earliest=pendulum.datetime(2023, 3, 24, tz=TIMEZONE),
+ latest=None,
+ catchup=True,
+ )
+
+ # Last run before DST. Interval starts and ends on 2am UTC (local time
is +1).
+ next_info =
timetable.next_dagrun_info(last_automated_data_interval=None,
restriction=restriction)
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 3, 24, 2, tz=TIMEZONE),
+ pendulum.datetime(2023, 3, 25, 2, tz=TIMEZONE),
+ )
+
+ # Crossing the DST switch. Interval starts on 2am UTC (local time +1)
+ # but ends on 1am UTC (local time is +2).
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 3, 25, 2, tz=TIMEZONE),
+ pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE),
+ )
+
+ # In DST. Interval starts and ends on 1am UTC (local time is +2).
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE),
+ pendulum.datetime(2023, 3, 27, 1, tz=TIMEZONE),
+ )
+
+ def test_entering_skip(self) -> None:
+ timetable = CronDataIntervalTimetable("0 2 * * *",
timezone="Europe/Zurich")
+ restriction = TimeRestriction(
+ earliest=pendulum.datetime(2023, 3, 24, tz=TIMEZONE),
+ latest=None,
+ catchup=True,
+ )
+
+ # Last run before DST. Interval starts and ends on 1am UTC (local time
is +1).
+ next_info =
timetable.next_dagrun_info(last_automated_data_interval=None,
restriction=restriction)
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 3, 24, 1, tz=TIMEZONE),
+ pendulum.datetime(2023, 3, 25, 1, tz=TIMEZONE),
+ )
+
+ # Crossing the DST switch. Interval starts on 1am UTC (local time +1)
+ # and ends on 1am UTC (local time is +2) since the 2am wall clock time
+ # does not logically exist due to entering DST.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 3, 25, 1, tz=TIMEZONE),
+ pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE),
+ )
+
+ # In DST. Interval starts on 1am UTC (local time is +2 but 2am local
+ # time is not possible) and ends on 0am UTC.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 3, 26, 1, tz=TIMEZONE),
+ pendulum.datetime(2023, 3, 27, 0, tz=TIMEZONE),
+ )
+
+ def test_exiting_exact(self) -> None:
+ timetable = CronDataIntervalTimetable("0 3 * * *",
timezone="Europe/Zurich")
+ restriction = TimeRestriction(
+ earliest=pendulum.datetime(2023, 10, 27, tz=TIMEZONE),
+ latest=None,
+ catchup=True,
+ )
+
+ # Last run in DST. Interval starts and ends on 1am UTC (local time is
+2).
+ next_info =
timetable.next_dagrun_info(last_automated_data_interval=None,
restriction=restriction)
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 27, 1, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 28, 1, tz=TIMEZONE),
+ )
+
+ # Crossing the DST switch. Interval starts on 1am UTC (local time +2)
+ # and ends on 2am UTC (local time +1).
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 28, 1, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 29, 2, tz=TIMEZONE),
+ )
+
+ # Out of DST. Interval starts and ends on 2am UTC (local time is +1).
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 29, 2, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 30, 2, tz=TIMEZONE),
+ )
+
+ def test_exiting_fold(self) -> None:
+ timetable = CronDataIntervalTimetable("0 2 * * *",
timezone="Europe/Zurich")
+ restriction = TimeRestriction(
+ earliest=pendulum.datetime(2023, 10, 27, tz=TIMEZONE),
+ latest=None,
+ catchup=True,
+ )
+
+ # Last run before folding. Interval starts and ends on 0am UTC (local
+ # time is +2).
+ next_info =
timetable.next_dagrun_info(last_automated_data_interval=None,
restriction=restriction)
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 27, 0, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 28, 0, tz=TIMEZONE),
+ )
+
+ # Account for folding. Interval starts on 0am UTC (local time +2) and
+ # ends on 1am UTC (local time +1). There are two 2am local times on the
+ # 29th due to folding. We end on the second one (fold=1. There's no
+ # logical reason here; this is simply what Airflow has been doing since
+ # a long time ago, and there's no point breaking it.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 28, 0, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 29, 1, tz=TIMEZONE),
+ )
+
+ # Stepping out of DST. Interval starts from the folded 2am local time
+ # (1am UTC out of DST) since that is when the previous interval ended.
+ # It ends at 1am UTC (local time is +1) normally.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 29, 1, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 30, 1, tz=TIMEZONE),
+ )
+
+
+class TestCronIntervalDstNonTrivial:
+ """These tests are similar to TestCronIntervalDst but with a different
cron.
+
+ The original test cases are from apache/airflow#7999. In 2020 at Los
Angeles,
+ DST started on 8th Mar; 10am UTC was turned from 2am UTC-8 to 3am UTC-7.
+ """
+
+ def test_7_to_8_entering(self):
+ timetable = CronDataIntervalTimetable("0 7-8 * * *",
timezone="America/Los_Angeles")
+ restriction = TimeRestriction(
+ earliest=pendulum.datetime(2020, 3, 7, tz=TIMEZONE),
+ latest=None,
+ catchup=True,
+ )
+
+ # Triggers as expected before the interval touches the DST transition.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=None,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2020, 3, 7, 7 + 8, tz=TIMEZONE),
+ pendulum.datetime(2020, 3, 7, 8 + 8, tz=TIMEZONE),
+ )
+
+ # This interval ends an hour early since it includes the DST switch!
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2020, 3, 7, 8 + 8, tz=TIMEZONE),
+ pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE),
+ )
+
+ # We're fully into DST so the interval is as expected.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE),
+ pendulum.datetime(2020, 3, 8, 8 + 7, tz=TIMEZONE),
+ )
+
+ def test_7_and_9_entering(self):
+ timetable = CronDataIntervalTimetable("0 7,9 * * *",
timezone="America/Los_Angeles")
+ restriction = TimeRestriction(
+ earliest=pendulum.datetime(2020, 3, 7, tz=TIMEZONE),
+ latest=None,
+ catchup=True,
+ )
+
+ # Triggers as expected before the interval touches the DST transition.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=None,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2020, 3, 7, 7 + 8, tz=TIMEZONE),
+ pendulum.datetime(2020, 3, 7, 9 + 8, tz=TIMEZONE),
+ )
+
+ # This interval ends an hour early since it includes the DST switch!
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2020, 3, 7, 9 + 8, tz=TIMEZONE),
+ pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE),
+ )
+
+ # We're fully into DST so the interval is as expected.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2020, 3, 8, 7 + 7, tz=TIMEZONE),
+ pendulum.datetime(2020, 3, 8, 9 + 7, tz=TIMEZONE),
+ )
+
+
+def test_fold_scheduling():
+ timetable = CronDataIntervalTimetable("*/30 * * * *",
timezone="Europe/Zurich")
+ restriction = TimeRestriction(
+ earliest=pendulum.datetime(2023, 10, 28, 23, 30, tz=TIMEZONE), #
Locally 1:30 (DST).
+ latest=None,
+ catchup=True,
+ )
+
+ # Still in DST, acting normally.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=None,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 28, 23, 30, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 29, 0, 0, tz=TIMEZONE), # Locally 2am
(DST).
+ )
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 29, 0, 0, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 29, 0, 30, tz=TIMEZONE), # Locally 2:30
(DST).
+ )
+
+ # Crossing into fold.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 29, 0, 30, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 29, 1, 0, tz=TIMEZONE), # Locally 2am
(fold, not DST).
+ )
+
+ # In the "fold zone".
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 29, 1, 0, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 29, 1, 30, tz=TIMEZONE), # Locally 2am
(fold, not DST).
+ )
+
+ # Stepping out of fold.
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=next_info.data_interval,
+ restriction=restriction,
+ )
+ assert next_info and next_info.data_interval == DataInterval(
+ pendulum.datetime(2023, 10, 29, 1, 30, tz=TIMEZONE),
+ pendulum.datetime(2023, 10, 29, 2, 0, tz=TIMEZONE), # Locally 3am
(not DST).
+ )