This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 74c62f24607 [Backport] Introduce DeltaTriggerTimetable (#47074)
74c62f24607 is described below
commit 74c62f246078fba531d66eb431bbfd603f03dfec
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Feb 27 02:15:17 2025 +0800
[Backport] Introduce DeltaTriggerTimetable (#47074)
---
airflow/config_templates/config.yml | 21 +-
airflow/timetables/_delta.py | 56 +++++
airflow/timetables/interval.py | 30 +--
airflow/timetables/trigger.py | 154 +++++++++----
.../authoring-and-scheduling/timetable.rst | 104 ++++++---
tests/timetables/test_interval_timetable.py | 25 +-
tests/timetables/test_trigger_timetable.py | 251 +++++++++++++++++++--
7 files changed, 525 insertions(+), 116 deletions(-)
diff --git a/airflow/config_templates/config.yml
b/airflow/config_templates/config.yml
index 04b4e27843f..d90a931db23 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2661,7 +2661,26 @@ scheduler:
type: boolean
example: ~
default: "True"
- see_also: ":ref:`Differences between the two cron timetables`"
+ see_also: ':ref:`Differences between "trigger" and "data interval"
timetables`'
+ create_delta_data_intervals:
+ description: |
+ Whether to create DAG runs that span an interval or one single point
in time when a timedelta or
+ relativedelta is provided to ``schedule`` argument of a DAG.
+
+ * ``True``: **DeltaDataIntervalTimetable** is used, which is suitable
for DAGs with well-defined data
+ interval. You get contiguous intervals from the end of the previous
interval up to the scheduled
+ datetime.
+ * ``False``: **DeltaTriggerTimetable** is used, which is suitable for
DAGs that simply want to say
+ e.g. "run this every day" and do not care about the data interval.
+
+ Notably, for **DeltaTriggerTimetable**, the logical date is the same
as the time the DAG Run will
+ try to schedule, while for **DeltaDataIntervalTimetable**, the logical
date is the beginning of
+ the data interval, but the DAG Run will try to schedule at the end of
the data interval.
+ version_added: 2.11.0
+ type: boolean
+ example: ~
+ default: "True"
+ see_also: ':ref:`Differences between "trigger" and "data interval"
timetables`'
triggerer:
description: ~
options:
diff --git a/airflow/timetables/_delta.py b/airflow/timetables/_delta.py
new file mode 100644
index 00000000000..7203cd40631
--- /dev/null
+++ b/airflow/timetables/_delta.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+from typing import TYPE_CHECKING
+
+from airflow.exceptions import AirflowTimetableInvalid
+from airflow.utils.timezone import convert_to_utc
+
+if TYPE_CHECKING:
+ from dateutil.relativedelta import relativedelta
+ from pendulum import DateTime
+
+
+class DeltaMixin:
+ """Mixin to provide interface to work with timedelta and relativedelta."""
+
+ def __init__(self, delta: datetime.timedelta | relativedelta) -> None:
+ self._delta = delta
+
+ @property
+ def summary(self) -> str:
+ return str(self._delta)
+
+ def validate(self) -> None:
+ now = datetime.datetime.now()
+ if (now + self._delta) <= now:
+ raise AirflowTimetableInvalid(f"schedule interval must be
positive, not {self._delta!r}")
+
+ def _get_next(self, current: DateTime) -> DateTime:
+ return convert_to_utc(current + self._delta)
+
+ def _get_prev(self, current: DateTime) -> DateTime:
+ return convert_to_utc(current - self._delta)
+
+ def _align_to_next(self, current: DateTime) -> DateTime:
+ return current
+
+ def _align_to_prev(self, current: DateTime) -> DateTime:
+ return current
diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index ef4b5d0afc4..81d53e35378 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -22,10 +22,10 @@ from typing import TYPE_CHECKING, Any, Union
from dateutil.relativedelta import relativedelta
from pendulum import DateTime
-from airflow.exceptions import AirflowTimetableInvalid
from airflow.timetables._cron import CronMixin
+from airflow.timetables._delta import DeltaMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
-from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow
+from airflow.utils.timezone import coerce_datetime, utcnow
if TYPE_CHECKING:
from airflow.timetables.base import TimeRestriction
@@ -173,7 +173,7 @@ class CronDataIntervalTimetable(CronMixin,
_DataIntervalTimetable):
return DataInterval(start=self._get_prev(end), end=end)
-class DeltaDataIntervalTimetable(_DataIntervalTimetable):
+class DeltaDataIntervalTimetable(DeltaMixin, _DataIntervalTimetable):
"""
Timetable that schedules data intervals with a time delta.
@@ -182,9 +182,6 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
instance.
"""
- def __init__(self, delta: Delta) -> None:
- self._delta = delta
-
@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
from airflow.serialization.serialized_objects import
decode_relativedelta
@@ -204,10 +201,6 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
return NotImplemented
return self._delta == other._delta
- @property
- def summary(self) -> str:
- return str(self._delta)
-
def serialize(self) -> dict[str, Any]:
from airflow.serialization.serialized_objects import
encode_relativedelta
@@ -218,23 +211,6 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
delta = encode_relativedelta(self._delta)
return {"delta": delta}
- def validate(self) -> None:
- now = datetime.datetime.now()
- if (now + self._delta) <= now:
- raise AirflowTimetableInvalid(f"schedule interval must be
positive, not {self._delta!r}")
-
- def _get_next(self, current: DateTime) -> DateTime:
- return convert_to_utc(current + self._delta)
-
- def _get_prev(self, current: DateTime) -> DateTime:
- return convert_to_utc(current - self._delta)
-
- def _align_to_next(self, current: DateTime) -> DateTime:
- return current
-
- def _align_to_prev(self, current: DateTime) -> DateTime:
- return current
-
@staticmethod
def _relativedelta_in_seconds(delta: relativedelta) -> int:
return (
diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py
index a4666946fa7..8dad7dfecd1 100644
--- a/airflow/timetables/trigger.py
+++ b/airflow/timetables/trigger.py
@@ -20,8 +20,9 @@ import datetime
from typing import TYPE_CHECKING, Any
from airflow.timetables._cron import CronMixin
+from airflow.timetables._delta import DeltaMixin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
-from airflow.utils import timezone
+from airflow.utils.timezone import coerce_datetime, utcnow
if TYPE_CHECKING:
from dateutil.relativedelta import relativedelta
@@ -31,60 +32,43 @@ if TYPE_CHECKING:
from airflow.timetables.base import TimeRestriction
-class CronTriggerTimetable(CronMixin, Timetable):
- """
- Timetable that triggers DAG runs according to a cron expression.
-
- This is different from ``CronDataIntervalTimetable``, where the cron
- expression specifies the *data interval* of a DAG run. With this timetable,
- the data intervals are specified independently from the cron expression.
- Also for the same reason, this timetable kicks off a DAG run immediately at
- the start of the period (similar to POSIX cron), instead of needing to wait
- for one data interval to pass.
+def _serialize_interval(interval: datetime.timedelta | relativedelta) -> float
| dict:
+ from airflow.serialization.serialized_objects import encode_relativedelta
- Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
- """
+ if isinstance(interval, datetime.timedelta):
+ return interval.total_seconds()
+ return encode_relativedelta(interval)
- def __init__(
- self,
- cron: str,
- *,
- timezone: str | Timezone | FixedTimezone,
- interval: datetime.timedelta | relativedelta = datetime.timedelta(),
- ) -> None:
- super().__init__(cron, timezone)
- self._interval = interval
- @classmethod
- def deserialize(cls, data: dict[str, Any]) -> Timetable:
- from airflow.serialization.serialized_objects import
decode_relativedelta, decode_timezone
+def _deserialize_interval(value: int | dict) -> datetime.timedelta |
relativedelta:
+ from airflow.serialization.serialized_objects import decode_relativedelta
- interval: datetime.timedelta | relativedelta
- if isinstance(data["interval"], dict):
- interval = decode_relativedelta(data["interval"])
- else:
- interval = datetime.timedelta(seconds=data["interval"])
- return cls(data["expression"],
timezone=decode_timezone(data["timezone"]), interval=interval)
+ if isinstance(value, dict):
+ return decode_relativedelta(value)
+ return datetime.timedelta(seconds=value)
- def serialize(self) -> dict[str, Any]:
- from airflow.serialization.serialized_objects import
encode_relativedelta, encode_timezone
- interval: float | dict[str, Any]
- if isinstance(self._interval, datetime.timedelta):
- interval = self._interval.total_seconds()
- else:
- interval = encode_relativedelta(self._interval)
- timezone = encode_timezone(self._timezone)
- return {"expression": self._expression, "timezone": timezone,
"interval": interval}
+class _TriggerTimetable(Timetable):
+ _interval: datetime.timedelta | relativedelta
def infer_manual_data_interval(self, *, run_after: DateTime) ->
DataInterval:
return DataInterval(
- # pendulum.Datetime ± timedelta should return pendulum.Datetime
- # however mypy decide that output would be datetime.datetime
- run_after - self._interval, # type: ignore[arg-type]
+ coerce_datetime(run_after - self._interval),
run_after,
)
+ def _align_to_next(self, current: DateTime) -> DateTime:
+ raise NotImplementedError()
+
+ def _align_to_prev(self, current: DateTime) -> DateTime:
+ raise NotImplementedError()
+
+ def _get_next(self, current: DateTime) -> DateTime:
+ raise NotImplementedError()
+
+ def _get_prev(self, current: DateTime) -> DateTime:
+ raise NotImplementedError()
+
def next_dagrun_info(
self,
*,
@@ -99,7 +83,7 @@ class CronTriggerTimetable(CronMixin, Timetable):
else:
next_start_time = self._align_to_next(restriction.earliest)
else:
- start_time_candidates =
[self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))]
+ start_time_candidates =
[self._align_to_prev(coerce_datetime(utcnow()))]
if last_automated_data_interval is not None:
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
if restriction.earliest is not None:
@@ -113,3 +97,85 @@ class CronTriggerTimetable(CronMixin, Timetable):
next_start_time - self._interval, # type: ignore[arg-type]
next_start_time,
)
+
+
+class DeltaTriggerTimetable(DeltaMixin, _TriggerTimetable):
+ """
+ Timetable that triggers DAG runs according to a cron expression.
+
+ This is different from ``DeltaDataIntervalTimetable``, where the delta
value
+ specifies the *data interval* of a DAG run. With this timetable, the data
+ intervals are specified independently. Also for the same reason, this
+ timetable kicks off a DAG run immediately at the start of the period,
+ instead of needing to wait for one data interval to pass.
+
+ :param delta: How much time to wait between each run.
+ :param interval: The data interval of each run. Default is 0.
+ """
+
+ def __init__(
+ self,
+ delta: datetime.timedelta | relativedelta,
+ *,
+ interval: datetime.timedelta | relativedelta = datetime.timedelta(),
+ ) -> None:
+ super().__init__(delta)
+ self._interval = interval
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Timetable:
+ return cls(
+ _deserialize_interval(data["delta"]),
+ interval=_deserialize_interval(data["interval"]),
+ )
+
+ def serialize(self) -> dict[str, Any]:
+ return {
+ "delta": _serialize_interval(self._delta),
+ "interval": _serialize_interval(self._interval),
+ }
+
+
+class CronTriggerTimetable(CronMixin, _TriggerTimetable):
+ """
+ Timetable that triggers DAG runs according to a cron expression.
+
+ This is different from ``CronDataIntervalTimetable``, where the cron
+ expression specifies the *data interval* of a DAG run. With this timetable,
+ the data intervals are specified independently from the cron expression.
+ Also for the same reason, this timetable kicks off a DAG run immediately at
+ the start of the period (similar to POSIX cron), instead of needing to wait
+ for one data interval to pass.
+
+ Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
+ """
+
+ def __init__(
+ self,
+ cron: str,
+ *,
+ timezone: str | Timezone | FixedTimezone,
+ interval: datetime.timedelta | relativedelta = datetime.timedelta(),
+ ) -> None:
+ super().__init__(cron, timezone)
+ self._interval = interval
+
+ @classmethod
+ def deserialize(cls, data: dict[str, Any]) -> Timetable:
+ from airflow.serialization.serialized_objects import
decode_relativedelta, decode_timezone
+
+ interval: datetime.timedelta | relativedelta
+ if isinstance(data["interval"], dict):
+ interval = decode_relativedelta(data["interval"])
+ else:
+ interval = datetime.timedelta(seconds=data["interval"])
+ return cls(data["expression"],
timezone=decode_timezone(data["timezone"]), interval=interval)
+
+ def serialize(self) -> dict[str, Any]:
+ from airflow.serialization.serialized_objects import encode_timezone
+
+ return {
+ "expression": self._expression,
+ "timezone": encode_timezone(self._timezone),
+ "interval": _serialize_interval(self._interval),
+ }
diff --git a/docs/apache-airflow/authoring-and-scheduling/timetable.rst
b/docs/apache-airflow/authoring-and-scheduling/timetable.rst
index d9b47dd9c44..698aeb850e4 100644
--- a/docs/apache-airflow/authoring-and-scheduling/timetable.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/timetable.rst
@@ -64,6 +64,53 @@ Built-in Timetables
Airflow comes with several common timetables built-in to cover the most common
use cases. Additional timetables
may be available in plugins.
+.. _DeltaTriggerTimetable:
+
+DeltaTriggerTimetable
+^^^^^^^^^^^^^^^^^^^^^
+
+A timetable that accepts a :class:`datetime.timedelta` or
``dateutil.relativedelta.relativedelta``, and runs
+the DAG once a delta passes.
+
+.. seealso:: `Differences between "trigger" and "data interval" timetables`_
+
+.. code-block:: python
+
+ from datetime import timedelta
+
+ from airflow.timetables.trigger import DeltaTriggerTimetable
+
+
+ @dag(schedule=DeltaTriggerTimetable(timedelta(days=7)), ...) # Once every
week.
+ def example_dag():
+ pass
+
+You can also provide a static data interval to the timetable. The optional
``interval`` argument also
+should be a :class:`datetime.timedelta` or
``dateutil.relativedelta.relativedelta``. When using these
+arguments, a triggered DAG run's data interval spans the specified duration,
and *ends* with the trigger time.
+
+.. code-block:: python
+
+ from datetime import UTC, datetime, timedelta
+
+ from dateutil.relativedelta import relativedelta, FR
+
+ from airflow.timetables.trigger import DeltaTriggerTimetable
+
+
+ @dag(
+ # Runs every Friday at 18:00 to cover the work week.
+ schedule=DeltaTriggerTimetable(
+ relativedelta(weekday=FR(), hour=18),
+ interval=timedelta(days=4, hours=9),
+ ),
+ start_date=datetime(2025, 1, 3, 18, tzinfo=UTC),
+ ...,
+ )
+ def example_dag():
+ pass
+
+
.. _CronTriggerTimetable:
CronTriggerTimetable
@@ -71,7 +118,7 @@ CronTriggerTimetable
A timetable that accepts a cron expression, and triggers DAG runs according to
it.
-.. seealso:: `Differences between the two cron timetables`_
+.. seealso:: `Differences between "trigger" and "data interval" timetables`_
.. code-block:: python
@@ -132,7 +179,7 @@ CronDataIntervalTimetable
A timetable that accepts a cron expression, creates data intervals according
to the interval between each cron
trigger points, and triggers a DAG run at the end of each data interval.
-.. seealso:: `Differences between the two cron timetables`_
+.. seealso:: `Differences between "trigger" and "data interval" timetables`_
.. seealso:: `Differences between the cron and delta data interval timetables`_
Select this timetable by providing a valid cron expression as a string to the
``schedule``
@@ -209,37 +256,39 @@ Here's an example of a DAG using
``DatasetOrTimeSchedule``:
Timetables comparisons
----------------------
-.. _Differences between the two cron timetables:
+.. _Differences between "trigger" and "data interval" timetables:
+
+Differences between "trigger" and "data interval" timetables
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
-Differences between the two cron timetables
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Airflow has two sets of timetables for cron and delta schedules:
-Airflow has two timetables `CronTriggerTimetable`_ and
`CronDataIntervalTimetable`_ that accept a cron expression.
+* CronTriggerTimetable_ and CronDataIntervalTimetable_ both accept a cron
expression.
+* DeltaTriggerTimetable_ and DeltaDataIntervalTimetable_ both accept a
timedelta or relativedelta.
-However, there are differences between the two:
-- `CronTriggerTimetable`_ does not address *Data Interval*, while
`CronDataIntervalTimetable`_ does.
-- The timestamp in the ``run_id``, the ``logical_date`` for
`CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ are defined
differently based on how they handle the data interval, as described in
:ref:`timetables_run_id_logical_date`.
+- A trigger timetable (CronTriggerTimetable_ or DeltaTriggerTimetable_) does
not address the concept of *data interval*, while a "data interval" one
(CronDataIntervalTimetable_ or DeltaDataIntervalTimetable_) does.
+- The timestamp in the ``run_id``, the ``logical_date`` of the two timetable
kinds are defined differently based on how they handle the data interval, as
described in :ref:`timetables_run_id_logical_date`.
Whether taking care of *Data Interval*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-`CronTriggerTimetable`_ *does not* include *data interval*. This means that
the value of ``data_interval_start`` and
-``data_interval_end`` (and the legacy ``execution_date``) are the same; the
time when a DAG run is triggered.
+A trigger timetable *does not* include *data interval*. This means that the
value of ``data_interval_start``
+and ``data_interval_end`` (and the legacy ``execution_date``) are the same;
the time when a DAG run is
+triggered.
-However, `CronDataIntervalTimetable`_ *does* include *data interval*. This
means the value of
-``data_interval_start`` and ``data_interval_end`` (and legacy
``execution_date``) are different. ``data_interval_start`` is the time when a
-DAG run is triggered and ``data_interval_end`` is the end of the interval.
+For a data interval timetable, the value of ``data_interval_start`` and
``data_interval_end`` (and legacy
+``execution_date``) are different. ``data_interval_start`` is the time when a
DAG run is triggered and
+``data_interval_end`` is the end of the interval.
*Catchup* behavior
^^^^^^^^^^^^^^^^^^
-Whether you're using `CronTriggerTimetable`_ or `CronDataIntervalTimetable`_,
there is no difference when ``catchup`` is ``True``.
-
You might want to use ``False`` for ``catchup`` for certain scenarios, to
prevent running unnecessary DAGs:
- If you create a new DAG with a start date in the past, and don't want to run
DAGs for the past. If ``catchup`` is ``True``, Airflow runs all DAGs that would
have run in that time interval.
- If you pause an existing DAG, and then restart it at a later date, and don't
want to If ``catchup`` is ``True``,
-In these scenarios, the ``logical_date`` in the ``run_id`` are based on how
`CronTriggerTimetable`_ or `CronDataIntervalTimetable`_ handle the data
interval.
+In these scenarios, the ``logical_date`` in the ``run_id`` are based on how
how the timetable handles the data
+interval.
See :ref:`dag-catchup` for more information about how DAG runs are triggered
when using ``catchup``.
@@ -248,30 +297,29 @@ See :ref:`dag-catchup` for more information about how DAG
runs are triggered whe
The time when a DAG run is triggered
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-`CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ trigger DAG runs at
the same time. However, the timestamp for the ``run_id`` is different for each.
-
-- `CronTriggerTimetable`_ has a ``run_id`` timestamp, the ``logical_date``,
showing when DAG run is able to start.
-- `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ trigger DAG runs at
the same time. However, the timestamp for the ``run_id`` (``logical_date``) is
different for each.
+Both trigger and data interval timetables trigger DAG runs at the same time.
However, the timestamp for the
+``run_id`` is different for each. This is because ``run_id`` is based on
``logical_date``.
For example, suppose there is a cron expression ``@daily`` or ``0 0 * * *``,
which is scheduled to run at 12AM every day. If you enable DAGs using the two
timetables at 3PM on January
31st,
-- `CronTriggerTimetable`_ triggers a new DAG run at 12AM on February 1st. The
``run_id`` timestamp is midnight, on February 1st.
-- `CronDataIntervalTimetable`_ immediately triggers a new DAG run, because a
DAG run for the daily time interval beginning at 12AM on January 31st did not
occur yet. The ``run_id`` timestamp is midnight, on January 31st, since that is
the beginning of the data interval.
+- `CronTriggerTimetable`_ creates a new DAG run at 12AM on February 1st. The
``run_id`` timestamp is midnight, on February 1st.
+- `CronDataIntervalTimetable`_ immediately creates a new DAG run, because a
DAG run for the daily time interval beginning at 12AM on January 31st did not
occur yet. The ``run_id`` timestamp is midnight, on January 31st, since that is
the beginning of the data interval.
-This is another example showing the difference in the case of skipping DAG
runs.
+The following is another example showing the difference in the case of
skipping DAG runs:
Suppose there are two running DAGs with a cron expression ``@daily`` or ``0 0
* * *`` that use the two different timetables. If you pause the DAGs at 3PM on
January 31st and re-enable them at 3PM on February 2nd,
- `CronTriggerTimetable`_ skips the DAG runs that were supposed to trigger on
February 1st and 2nd. The next DAG run will be triggered at 12AM on February
3rd.
- `CronDataIntervalTimetable`_ skips the DAG runs that were supposed to
trigger on February 1st only. A DAG run for February 2nd is immediately
triggered after you re-enable the DAG.
-In these examples, you see how `CronTriggerTimetable`_ triggers DAG runs is
more intuitive and more similar to what
-people expect cron to behave than how `CronDataIntervalTimetable`_ does.
+In these examples, you see how a trigger timetable creates DAG runs more
intuitively and similar to what
+people expect a workflow to behave, while a data interval timetable is
designed heavily around the data
+interval it processes, and does not reflect a workflow's own properties.
.. _Differences between the cron and delta data interval timetables:
-Differences between the cron and delta data interval timetables:
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Differences between the cron and delta data interval timetables
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Choosing between `DeltaDataIntervalTimetable`_ and
`CronDataIntervalTimetable`_ depends on your use case.
If you enable a DAG at 01:05 on February 1st, the following table summarizes
the DAG runs created and the
diff --git a/tests/timetables/test_interval_timetable.py
b/tests/timetables/test_interval_timetable.py
index 90883206551..3c40174d8bf 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -181,9 +181,28 @@ def test_validate_failure(timetable: Timetable,
error_message: str) -> None:
assert str(ctx.value) == error_message
-def test_cron_interval_timezone_from_string():
- timetable = CronDataIntervalTimetable("@hourly", "UTC")
- assert timetable.serialize()["timezone"] == "UTC"
+def test_cron_interval_serialize():
+ data = HOURLY_CRON_TIMETABLE.serialize()
+ assert data == {"expression": "0 * * * *", "timezone": "UTC"}
+ tt = CronDataIntervalTimetable.deserialize(data)
+ assert isinstance(tt, CronDataIntervalTimetable)
+ assert tt._expression == HOURLY_CRON_TIMETABLE._expression
+ assert tt._timezone == HOURLY_CRON_TIMETABLE._timezone
+
+
[email protected](
+ "timetable, expected_data",
+ [
+ (HOURLY_RELATIVEDELTA_TIMETABLE, {"delta": {"hours": 1}}),
+ (HOURLY_TIMEDELTA_TIMETABLE, {"delta": 3600.0}),
+ ],
+)
+def test_delta_interval_serialize(timetable, expected_data):
+ data = timetable.serialize()
+ assert data == expected_data
+ tt = DeltaDataIntervalTimetable.deserialize(data)
+ assert isinstance(tt, DeltaDataIntervalTimetable)
+ assert tt._delta == timetable._delta
@pytest.mark.parametrize(
diff --git a/tests/timetables/test_trigger_timetable.py
b/tests/timetables/test_trigger_timetable.py
index 5165a14b3c1..a3e4b3fe6ec 100644
--- a/tests/timetables/test_trigger_timetable.py
+++ b/tests/timetables/test_trigger_timetable.py
@@ -25,8 +25,8 @@ import pytest
import time_machine
from airflow.exceptions import AirflowTimetableInvalid
-from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
-from airflow.timetables.trigger import CronTriggerTimetable
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
+from airflow.timetables.trigger import CronTriggerTimetable,
DeltaTriggerTimetable
from airflow.utils.timezone import utc
START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc)
@@ -39,6 +39,9 @@ YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1)
HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", timezone=utc)
+HOURLY_TIMEDELTA_TIMETABLE = DeltaTriggerTimetable(datetime.timedelta(hours=1))
+HOURLY_RELATIVEDELTA_TIMETABLE =
DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(hours=1))
+
DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
@@ -76,6 +79,47 @@ def
test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule(
assert next_info == DagRunInfo.exact(next_start_time)
[email protected](
+ "last_automated_data_interval, next_start_time",
+ [
+ pytest.param(
+ None,
+ CURRENT_TIME,
+ id="first-run",
+ ),
+ pytest.param(
+ DataInterval.exact(YESTERDAY + DELTA_FROM_MIDNIGHT),
+ CURRENT_TIME + DELTA_FROM_MIDNIGHT,
+ id="before-now",
+ ),
+ pytest.param(
+ DataInterval.exact(CURRENT_TIME + DELTA_FROM_MIDNIGHT),
+ CURRENT_TIME + datetime.timedelta(days=1) + DELTA_FROM_MIDNIGHT,
+ id="after-now",
+ ),
+ ],
+)
[email protected](
+ "timetable",
+ [
+ pytest.param(DeltaTriggerTimetable(datetime.timedelta(days=1)),
id="timedelta"),
+
pytest.param(DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(days=1)),
id="relativedelta"),
+ ],
+)
+@time_machine.travel(CURRENT_TIME)
+def test_daily_delta_trigger_no_catchup_first_starts_at_next_schedule(
+ last_automated_data_interval: DataInterval | None,
+ next_start_time: pendulum.DateTime,
+ timetable: Timetable,
+) -> None:
+ """If ``catchup=False`` and start_date is a day before"""
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=last_automated_data_interval,
+ restriction=TimeRestriction(earliest=YESTERDAY, latest=None,
catchup=False),
+ )
+ assert next_info == DagRunInfo.exact(next_start_time)
+
+
@pytest.mark.parametrize(
"current_time, earliest, expected",
[
@@ -124,6 +168,62 @@ def test_hourly_cron_trigger_no_catchup_next_info(
assert next_info == expected
[email protected](
+ "current_time, earliest, expected",
+ [
+ pytest.param(
+ pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc),
+ START_DATE,
+ DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0,
tzinfo=utc)),
+ id="current_time_on_boundary",
+ ),
+ pytest.param(
+ pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc),
+ START_DATE,
+ DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0,
tzinfo=utc)),
+ id="current_time_not_on_boundary",
+ ),
+ pytest.param(
+ pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc),
+ START_DATE,
+ DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0,
tzinfo=utc)),
+ id="current_time_miss_one_interval_on_boundary",
+ ),
+ pytest.param(
+ pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=utc),
+ START_DATE,
+ DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 30, 0,
tzinfo=utc)),
+ id="current_time_miss_one_interval_not_on_boundary",
+ ),
+ pytest.param(
+ pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc),
+ pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=utc),
+ DagRunInfo.exact(pendulum.DateTime(2199, 12, 31, 22, 30, 0,
tzinfo=utc)),
+ id="future_start_date",
+ ),
+ ],
+)
[email protected](
+ "timetable",
+ [
+ pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
+ pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
+ ],
+)
+def test_hourly_delta_trigger_no_catchup_next_info(
+ current_time: pendulum.DateTime,
+ earliest: pendulum.DateTime,
+ expected: DagRunInfo,
+ timetable: Timetable,
+) -> None:
+ with time_machine.travel(current_time):
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=PREV_DATA_INTERVAL_EXACT,
+ restriction=TimeRestriction(earliest=earliest, latest=None,
catchup=False),
+ )
+ assert next_info == expected
+
+
@pytest.mark.parametrize(
"last_automated_data_interval, earliest, expected",
[
@@ -171,6 +271,55 @@ def test_hourly_cron_trigger_catchup_next_info(
assert next_info == expected
[email protected](
+ "last_automated_data_interval, earliest, expected",
+ [
+ pytest.param(
+ DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0,
tzinfo=utc)),
+ START_DATE,
+ DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0,
tzinfo=utc)),
+ id="last_automated_on_boundary",
+ ),
+ pytest.param(
+ DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0,
tzinfo=utc)),
+ START_DATE,
+ DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 30, 0,
tzinfo=utc)),
+ id="last_automated_not_on_boundary",
+ ),
+ pytest.param(
+ None,
+ pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc),
+ DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0,
tzinfo=utc)),
+ id="no_last_automated_with_earliest_on_boundary",
+ ),
+ pytest.param(
+ None,
+ pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc),
+ DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0,
tzinfo=utc)),
+ id="no_last_automated_with_earliest_not_on_boundary",
+ ),
+ ],
+)
[email protected](
+ "timetable",
+ [
+ pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
+ pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
+ ],
+)
+def test_hourly_delta_trigger_catchup_next_info(
+ last_automated_data_interval: DataInterval | None,
+ earliest: pendulum.DateTime | None,
+ expected: DagRunInfo | None,
+ timetable: Timetable,
+) -> None:
+ next_info = timetable.next_dagrun_info(
+ last_automated_data_interval=last_automated_data_interval,
+ restriction=TimeRestriction(earliest=earliest, latest=None,
catchup=True),
+ )
+ assert next_info == expected
+
+
def test_cron_trigger_next_info_with_interval():
# Runs every Monday on 16:30, covering the day before the run.
timetable = CronTriggerTimetable(
@@ -192,37 +341,73 @@ def test_cron_trigger_next_info_with_interval():
)
-def test_validate_success() -> None:
- HOURLY_CRON_TRIGGER_TIMETABLE.validate()
-
[email protected](
+ "timetable",
+ [
+ pytest.param(HOURLY_CRON_TRIGGER_TIMETABLE, id="cron"),
+ pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
+ pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
+ ],
+)
+def test_validate_success(timetable: Timetable) -> None:
+ timetable.validate()
-def test_validate_failure() -> None:
- timetable = CronTriggerTimetable("0 0 1 13 0", timezone=utc)
[email protected](
+ "timetable, message",
+ [
+ pytest.param(
+ CronTriggerTimetable("0 0 1 13 0", timezone=utc),
+ "[0 0 1 13 0] is not acceptable, out of range",
+ id="cron",
+ ),
+ pytest.param(
+ DeltaTriggerTimetable(datetime.timedelta(days=-1)),
+ "schedule interval must be positive, not
datetime.timedelta(days=-1)",
+ id="timedelta",
+ ),
+ pytest.param(
+
DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(days=-1)),
+ "schedule interval must be positive, not relativedelta(days=-1)",
+ id="relativedelta",
+ ),
+ ],
+)
+def test_validate_failure(timetable: Timetable, message: str) -> None:
with pytest.raises(AirflowTimetableInvalid) as ctx:
timetable.validate()
- assert str(ctx.value) == "[0 0 1 13 0] is not acceptable, out of range"
+ assert str(ctx.value) == message
@pytest.mark.parametrize(
"timetable, data",
[
- (HOURLY_CRON_TRIGGER_TIMETABLE, {"expression": "0 * * * *",
"timezone": "UTC", "interval": 0}),
- (
+ pytest.param(
+ HOURLY_CRON_TRIGGER_TIMETABLE,
+ {"expression": "0 * * * *", "timezone": "UTC", "interval": 0.0},
+ id="hourly",
+ ),
+ pytest.param(
CronTriggerTimetable("0 0 1 12 *", timezone=utc,
interval=datetime.timedelta(hours=2)),
{"expression": "0 0 1 12 *", "timezone": "UTC", "interval":
7200.0},
+ id="interval",
),
- (
+ pytest.param(
CronTriggerTimetable(
"0 0 1 12 0",
timezone="Asia/Taipei",
interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO),
),
- {"expression": "0 0 1 12 0", "timezone": "Asia/Taipei",
"interval": {"weekday": [0]}},
+ {
+ "expression": "0 0 1 12 0",
+ "timezone": "Asia/Taipei",
+ "interval": {"weekday": [0]},
+ },
+ id="non-utc-interval",
),
],
)
-def test_serialization(timetable: CronTriggerTimetable, data: dict[str,
typing.Any]) -> None:
+def test_cron_trigger_serialization(timetable: CronTriggerTimetable, data:
dict[str, typing.Any]) -> None:
assert timetable.serialize() == data
tt = CronTriggerTimetable.deserialize(data)
@@ -230,3 +415,43 @@ def test_serialization(timetable: CronTriggerTimetable,
data: dict[str, typing.A
assert tt._expression == timetable._expression
assert tt._timezone == timetable._timezone
assert tt._interval == timetable._interval
+
+
[email protected](
+ "timetable, data",
+ [
+ pytest.param(
+ HOURLY_TIMEDELTA_TIMETABLE,
+ {"delta": 3600.0, "interval": 0.0},
+ id="timedelta",
+ ),
+ pytest.param(
+ DeltaTriggerTimetable(
+ datetime.timedelta(hours=3),
+
interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO),
+ ),
+ {"delta": 10800.0, "interval": {"weekday": [0]}},
+ id="timedelta-interval",
+ ),
+ pytest.param(
+ HOURLY_RELATIVEDELTA_TIMETABLE,
+ {"delta": {"hours": 1}, "interval": 0.0},
+ id="relativedelta",
+ ),
+ pytest.param(
+ DeltaTriggerTimetable(
+
dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO),
+ interval=datetime.timedelta(days=7),
+ ),
+ {"delta": {"weekday": [0]}, "interval": 604800.0},
+ id="relativedelta-interval",
+ ),
+ ],
+)
+def test_delta_trigger_serialization(timetable: DeltaTriggerTimetable, data:
dict[str, typing.Any]) -> None:
+ assert timetable.serialize() == data
+
+ tt = DeltaTriggerTimetable.deserialize(data)
+ assert isinstance(tt, DeltaTriggerTimetable)
+ assert tt._delta == timetable._delta
+ assert tt._interval == timetable._interval