This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 74c62f24607 [Backport] Introduce DeltaTriggerTimetable (#47074)
74c62f24607 is described below

commit 74c62f246078fba531d66eb431bbfd603f03dfec
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Feb 27 02:15:17 2025 +0800

    [Backport] Introduce DeltaTriggerTimetable (#47074)
---
 airflow/config_templates/config.yml                |  21 +-
 airflow/timetables/_delta.py                       |  56 +++++
 airflow/timetables/interval.py                     |  30 +--
 airflow/timetables/trigger.py                      | 154 +++++++++----
 .../authoring-and-scheduling/timetable.rst         | 104 ++++++---
 tests/timetables/test_interval_timetable.py        |  25 +-
 tests/timetables/test_trigger_timetable.py         | 251 +++++++++++++++++++--
 7 files changed, 525 insertions(+), 116 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 04b4e27843f..d90a931db23 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2661,7 +2661,26 @@ scheduler:
       type: boolean
       example: ~
       default: "True"
-      see_also: ":ref:`Differences between the two cron timetables`"
+      see_also: ':ref:`Differences between "trigger" and "data interval" 
timetables`'
+    create_delta_data_intervals:
+      description: |
+        Whether to create DAG runs that span an interval or one single point 
in time when a timedelta or
+        relativedelta is provided to ``schedule`` argument of a DAG.
+
+        * ``True``: **DeltaDataIntervalTimetable** is used, which is suitable 
for DAGs with well-defined data
+          interval. You get contiguous intervals from the end of the previous 
interval up to the scheduled
+          datetime.
+        * ``False``: **DeltaTriggerTimetable** is used, which is suitable for 
DAGs that simply want to say
+          e.g. "run this every day" and do not care about the data interval.
+
+        Notably, for **DeltaTriggerTimetable**, the logical date is the same 
as the time the DAG Run will
+        try to schedule, while for **DeltaDataIntervalTimetable**, the logical 
date is the beginning of
+        the data interval, but the DAG Run will try to schedule at the end of 
the data interval.
+      version_added: 2.11.0
+      type: boolean
+      example: ~
+      default: "True"
+      see_also: ':ref:`Differences between "trigger" and "data interval" 
timetables`'
 triggerer:
   description: ~
   options:
diff --git a/airflow/timetables/_delta.py b/airflow/timetables/_delta.py
new file mode 100644
index 00000000000..7203cd40631
--- /dev/null
+++ b/airflow/timetables/_delta.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import datetime
+from typing import TYPE_CHECKING
+
+from airflow.exceptions import AirflowTimetableInvalid
+from airflow.utils.timezone import convert_to_utc
+
+if TYPE_CHECKING:
+    from dateutil.relativedelta import relativedelta
+    from pendulum import DateTime
+
+
+class DeltaMixin:
+    """Mixin to provide interface to work with timedelta and relativedelta."""
+
+    def __init__(self, delta: datetime.timedelta | relativedelta) -> None:
+        self._delta = delta
+
+    @property
+    def summary(self) -> str:
+        return str(self._delta)
+
+    def validate(self) -> None:
+        now = datetime.datetime.now()
+        if (now + self._delta) <= now:
+            raise AirflowTimetableInvalid(f"schedule interval must be 
positive, not {self._delta!r}")
+
+    def _get_next(self, current: DateTime) -> DateTime:
+        return convert_to_utc(current + self._delta)
+
+    def _get_prev(self, current: DateTime) -> DateTime:
+        return convert_to_utc(current - self._delta)
+
+    def _align_to_next(self, current: DateTime) -> DateTime:
+        return current
+
+    def _align_to_prev(self, current: DateTime) -> DateTime:
+        return current
diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index ef4b5d0afc4..81d53e35378 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -22,10 +22,10 @@ from typing import TYPE_CHECKING, Any, Union
 from dateutil.relativedelta import relativedelta
 from pendulum import DateTime
 
-from airflow.exceptions import AirflowTimetableInvalid
 from airflow.timetables._cron import CronMixin
+from airflow.timetables._delta import DeltaMixin
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
-from airflow.utils.timezone import coerce_datetime, convert_to_utc, utcnow
+from airflow.utils.timezone import coerce_datetime, utcnow
 
 if TYPE_CHECKING:
     from airflow.timetables.base import TimeRestriction
@@ -173,7 +173,7 @@ class CronDataIntervalTimetable(CronMixin, 
_DataIntervalTimetable):
         return DataInterval(start=self._get_prev(end), end=end)
 
 
-class DeltaDataIntervalTimetable(_DataIntervalTimetable):
+class DeltaDataIntervalTimetable(DeltaMixin, _DataIntervalTimetable):
     """
     Timetable that schedules data intervals with a time delta.
 
@@ -182,9 +182,6 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
     instance.
     """
 
-    def __init__(self, delta: Delta) -> None:
-        self._delta = delta
-
     @classmethod
     def deserialize(cls, data: dict[str, Any]) -> Timetable:
         from airflow.serialization.serialized_objects import 
decode_relativedelta
@@ -204,10 +201,6 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
             return NotImplemented
         return self._delta == other._delta
 
-    @property
-    def summary(self) -> str:
-        return str(self._delta)
-
     def serialize(self) -> dict[str, Any]:
         from airflow.serialization.serialized_objects import 
encode_relativedelta
 
@@ -218,23 +211,6 @@ class DeltaDataIntervalTimetable(_DataIntervalTimetable):
             delta = encode_relativedelta(self._delta)
         return {"delta": delta}
 
-    def validate(self) -> None:
-        now = datetime.datetime.now()
-        if (now + self._delta) <= now:
-            raise AirflowTimetableInvalid(f"schedule interval must be 
positive, not {self._delta!r}")
-
-    def _get_next(self, current: DateTime) -> DateTime:
-        return convert_to_utc(current + self._delta)
-
-    def _get_prev(self, current: DateTime) -> DateTime:
-        return convert_to_utc(current - self._delta)
-
-    def _align_to_next(self, current: DateTime) -> DateTime:
-        return current
-
-    def _align_to_prev(self, current: DateTime) -> DateTime:
-        return current
-
     @staticmethod
     def _relativedelta_in_seconds(delta: relativedelta) -> int:
         return (
diff --git a/airflow/timetables/trigger.py b/airflow/timetables/trigger.py
index a4666946fa7..8dad7dfecd1 100644
--- a/airflow/timetables/trigger.py
+++ b/airflow/timetables/trigger.py
@@ -20,8 +20,9 @@ import datetime
 from typing import TYPE_CHECKING, Any
 
 from airflow.timetables._cron import CronMixin
+from airflow.timetables._delta import DeltaMixin
 from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
-from airflow.utils import timezone
+from airflow.utils.timezone import coerce_datetime, utcnow
 
 if TYPE_CHECKING:
     from dateutil.relativedelta import relativedelta
@@ -31,60 +32,43 @@ if TYPE_CHECKING:
     from airflow.timetables.base import TimeRestriction
 
 
-class CronTriggerTimetable(CronMixin, Timetable):
-    """
-    Timetable that triggers DAG runs according to a cron expression.
-
-    This is different from ``CronDataIntervalTimetable``, where the cron
-    expression specifies the *data interval* of a DAG run. With this timetable,
-    the data intervals are specified independently from the cron expression.
-    Also for the same reason, this timetable kicks off a DAG run immediately at
-    the start of the period (similar to POSIX cron), instead of needing to wait
-    for one data interval to pass.
+def _serialize_interval(interval: datetime.timedelta | relativedelta) -> float 
| dict:
+    from airflow.serialization.serialized_objects import encode_relativedelta
 
-    Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
-    """
+    if isinstance(interval, datetime.timedelta):
+        return interval.total_seconds()
+    return encode_relativedelta(interval)
 
-    def __init__(
-        self,
-        cron: str,
-        *,
-        timezone: str | Timezone | FixedTimezone,
-        interval: datetime.timedelta | relativedelta = datetime.timedelta(),
-    ) -> None:
-        super().__init__(cron, timezone)
-        self._interval = interval
 
-    @classmethod
-    def deserialize(cls, data: dict[str, Any]) -> Timetable:
-        from airflow.serialization.serialized_objects import 
decode_relativedelta, decode_timezone
+def _deserialize_interval(value: int | dict) -> datetime.timedelta | 
relativedelta:
+    from airflow.serialization.serialized_objects import decode_relativedelta
 
-        interval: datetime.timedelta | relativedelta
-        if isinstance(data["interval"], dict):
-            interval = decode_relativedelta(data["interval"])
-        else:
-            interval = datetime.timedelta(seconds=data["interval"])
-        return cls(data["expression"], 
timezone=decode_timezone(data["timezone"]), interval=interval)
+    if isinstance(value, dict):
+        return decode_relativedelta(value)
+    return datetime.timedelta(seconds=value)
 
-    def serialize(self) -> dict[str, Any]:
-        from airflow.serialization.serialized_objects import 
encode_relativedelta, encode_timezone
 
-        interval: float | dict[str, Any]
-        if isinstance(self._interval, datetime.timedelta):
-            interval = self._interval.total_seconds()
-        else:
-            interval = encode_relativedelta(self._interval)
-        timezone = encode_timezone(self._timezone)
-        return {"expression": self._expression, "timezone": timezone, 
"interval": interval}
+class _TriggerTimetable(Timetable):
+    _interval: datetime.timedelta | relativedelta
 
     def infer_manual_data_interval(self, *, run_after: DateTime) -> 
DataInterval:
         return DataInterval(
-            # pendulum.Datetime ± timedelta should return pendulum.Datetime
-            # however mypy decide that output would be datetime.datetime
-            run_after - self._interval,  # type: ignore[arg-type]
+            coerce_datetime(run_after - self._interval),
             run_after,
         )
 
+    def _align_to_next(self, current: DateTime) -> DateTime:
+        raise NotImplementedError()
+
+    def _align_to_prev(self, current: DateTime) -> DateTime:
+        raise NotImplementedError()
+
+    def _get_next(self, current: DateTime) -> DateTime:
+        raise NotImplementedError()
+
+    def _get_prev(self, current: DateTime) -> DateTime:
+        raise NotImplementedError()
+
     def next_dagrun_info(
         self,
         *,
@@ -99,7 +83,7 @@ class CronTriggerTimetable(CronMixin, Timetable):
             else:
                 next_start_time = self._align_to_next(restriction.earliest)
         else:
-            start_time_candidates = 
[self._align_to_prev(timezone.coerce_datetime(timezone.utcnow()))]
+            start_time_candidates = 
[self._align_to_prev(coerce_datetime(utcnow()))]
             if last_automated_data_interval is not None:
                 
start_time_candidates.append(self._get_next(last_automated_data_interval.end))
             if restriction.earliest is not None:
@@ -113,3 +97,85 @@ class CronTriggerTimetable(CronMixin, Timetable):
             next_start_time - self._interval,  # type: ignore[arg-type]
             next_start_time,
         )
+
+
+class DeltaTriggerTimetable(DeltaMixin, _TriggerTimetable):
+    """
+    Timetable that triggers DAG runs according to a cron expression.
+
+    This is different from ``DeltaDataIntervalTimetable``, where the delta 
value
+    specifies the *data interval* of a DAG run. With this timetable, the data
+    intervals are specified independently. Also for the same reason, this
+    timetable kicks off a DAG run immediately at the start of the period,
+    instead of needing to wait for one data interval to pass.
+
+    :param delta: How much time to wait between each run.
+    :param interval: The data interval of each run. Default is 0.
+    """
+
+    def __init__(
+        self,
+        delta: datetime.timedelta | relativedelta,
+        *,
+        interval: datetime.timedelta | relativedelta = datetime.timedelta(),
+    ) -> None:
+        super().__init__(delta)
+        self._interval = interval
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        return cls(
+            _deserialize_interval(data["delta"]),
+            interval=_deserialize_interval(data["interval"]),
+        )
+
+    def serialize(self) -> dict[str, Any]:
+        return {
+            "delta": _serialize_interval(self._delta),
+            "interval": _serialize_interval(self._interval),
+        }
+
+
+class CronTriggerTimetable(CronMixin, _TriggerTimetable):
+    """
+    Timetable that triggers DAG runs according to a cron expression.
+
+    This is different from ``CronDataIntervalTimetable``, where the cron
+    expression specifies the *data interval* of a DAG run. With this timetable,
+    the data intervals are specified independently from the cron expression.
+    Also for the same reason, this timetable kicks off a DAG run immediately at
+    the start of the period (similar to POSIX cron), instead of needing to wait
+    for one data interval to pass.
+
+    Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
+    """
+
+    def __init__(
+        self,
+        cron: str,
+        *,
+        timezone: str | Timezone | FixedTimezone,
+        interval: datetime.timedelta | relativedelta = datetime.timedelta(),
+    ) -> None:
+        super().__init__(cron, timezone)
+        self._interval = interval
+
+    @classmethod
+    def deserialize(cls, data: dict[str, Any]) -> Timetable:
+        from airflow.serialization.serialized_objects import 
decode_relativedelta, decode_timezone
+
+        interval: datetime.timedelta | relativedelta
+        if isinstance(data["interval"], dict):
+            interval = decode_relativedelta(data["interval"])
+        else:
+            interval = datetime.timedelta(seconds=data["interval"])
+        return cls(data["expression"], 
timezone=decode_timezone(data["timezone"]), interval=interval)
+
+    def serialize(self) -> dict[str, Any]:
+        from airflow.serialization.serialized_objects import encode_timezone
+
+        return {
+            "expression": self._expression,
+            "timezone": encode_timezone(self._timezone),
+            "interval": _serialize_interval(self._interval),
+        }
diff --git a/docs/apache-airflow/authoring-and-scheduling/timetable.rst 
b/docs/apache-airflow/authoring-and-scheduling/timetable.rst
index d9b47dd9c44..698aeb850e4 100644
--- a/docs/apache-airflow/authoring-and-scheduling/timetable.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/timetable.rst
@@ -64,6 +64,53 @@ Built-in Timetables
 Airflow comes with several common timetables built-in to cover the most common 
use cases. Additional timetables
 may be available in plugins.
 
+.. _DeltaTriggerTimetable:
+
+DeltaTriggerTimetable
+^^^^^^^^^^^^^^^^^^^^^
+
+A timetable that accepts a :class:`datetime.timedelta` or 
``dateutil.relativedelta.relativedelta``, and runs
+the DAG once a delta passes.
+
+.. seealso:: `Differences between "trigger" and "data interval" timetables`_
+
+.. code-block:: python
+
+    from datetime import timedelta
+
+    from airflow.timetables.trigger import DeltaTriggerTimetable
+
+
+    @dag(schedule=DeltaTriggerTimetable(timedelta(days=7)), ...)  # Once every 
week.
+    def example_dag():
+        pass
+
+You can also provide a static data interval to the timetable. The optional 
``interval`` argument also
+should be a :class:`datetime.timedelta` or 
``dateutil.relativedelta.relativedelta``. When using these
+arguments, a triggered DAG run's data interval spans the specified duration, 
and *ends* with the trigger time.
+
+.. code-block:: python
+
+    from datetime import UTC, datetime, timedelta
+
+    from dateutil.relativedelta import relativedelta, FR
+
+    from airflow.timetables.trigger import DeltaTriggerTimetable
+
+
+    @dag(
+        # Runs every Friday at 18:00 to cover the work week.
+        schedule=DeltaTriggerTimetable(
+            relativedelta(weekday=FR(), hour=18),
+            interval=timedelta(days=4, hours=9),
+        ),
+        start_date=datetime(2025, 1, 3, 18, tzinfo=UTC),
+        ...,
+    )
+    def example_dag():
+        pass
+
+
 .. _CronTriggerTimetable:
 
 CronTriggerTimetable
@@ -71,7 +118,7 @@ CronTriggerTimetable
 
 A timetable that accepts a cron expression, and triggers DAG runs according to 
it.
 
-.. seealso:: `Differences between the two cron timetables`_
+.. seealso:: `Differences between "trigger" and "data interval" timetables`_
 
 .. code-block:: python
 
@@ -132,7 +179,7 @@ CronDataIntervalTimetable
 A timetable that accepts a cron expression, creates data intervals according 
to the interval between each cron
 trigger points, and triggers a DAG run at the end of each data interval.
 
-.. seealso:: `Differences between the two cron timetables`_
+.. seealso:: `Differences between "trigger" and "data interval" timetables`_
 .. seealso:: `Differences between the cron and delta data interval timetables`_
 
 Select this timetable by providing a valid cron expression as a string to the 
``schedule``
@@ -209,37 +256,39 @@ Here's an example of a DAG using 
``DatasetOrTimeSchedule``:
 Timetables comparisons
 ----------------------
 
-.. _Differences between the two cron timetables:
+.. _Differences between "trigger" and "data interval" timetables:
+
+Differences between "trigger" and "data interval" timetables
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
-Differences between the two cron timetables
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Airflow has two sets of timetables for cron and delta schedules:
 
-Airflow has two timetables `CronTriggerTimetable`_ and 
`CronDataIntervalTimetable`_ that accept a cron expression.
+* CronTriggerTimetable_ and CronDataIntervalTimetable_ both accept a cron 
expression.
+* DeltaTriggerTimetable_ and DeltaDataIntervalTimetable_ both accept a 
timedelta or relativedelta.
 
-However, there are differences between the two:
-- `CronTriggerTimetable`_ does not address *Data Interval*, while 
`CronDataIntervalTimetable`_ does.
-- The timestamp in the ``run_id``, the ``logical_date`` for 
`CronTriggerTimetable`_ and `CronDataIntervalTimetable`_  are defined 
differently based on how they handle the data interval, as described in 
:ref:`timetables_run_id_logical_date`.
+- A trigger timetable (CronTriggerTimetable_ or DeltaTriggerTimetable_) does 
not address the concept of *data interval*, while a "data interval" one 
(CronDataIntervalTimetable_ or DeltaDataIntervalTimetable_) does.
+- The timestamp in the ``run_id``, the ``logical_date`` of the two timetable 
kinds are defined differently based on how they handle the data interval, as 
described in :ref:`timetables_run_id_logical_date`.
 
 Whether taking care of *Data Interval*
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-`CronTriggerTimetable`_ *does not* include *data interval*. This means that 
the value of ``data_interval_start`` and
-``data_interval_end`` (and the legacy ``execution_date``) are the same; the 
time when a DAG run is triggered.
+A trigger timetable *does not* include *data interval*. This means that the 
value of ``data_interval_start``
+and ``data_interval_end`` (and the legacy ``execution_date``) are the same; 
the time when a DAG run is
+triggered.
 
-However, `CronDataIntervalTimetable`_ *does* include *data interval*. This 
means the value of
-``data_interval_start`` and ``data_interval_end`` (and legacy 
``execution_date``) are different. ``data_interval_start`` is the time when a
-DAG run is triggered and ``data_interval_end`` is the end of the interval.
+For a data interval timetable, the value of ``data_interval_start`` and 
``data_interval_end`` (and legacy
+``execution_date``) are different. ``data_interval_start`` is the time when a 
DAG run is triggered and
+``data_interval_end`` is the end of the interval.
 
 *Catchup* behavior
 ^^^^^^^^^^^^^^^^^^
 
-Whether you're using `CronTriggerTimetable`_ or `CronDataIntervalTimetable`_,  
there is no difference when ``catchup`` is ``True``.
-
 You might want to use ``False`` for ``catchup`` for certain scenarios, to 
prevent running unnecessary DAGs:
 - If you create a new DAG with a start date in the past, and don't want to run 
DAGs for the past. If ``catchup`` is ``True``, Airflow runs all DAGs that would 
have run in that time interval.
 - If you pause an existing DAG, and then restart it at a later date, and don't 
want to  If ``catchup`` is ``True``,
 
-In these scenarios, the ``logical_date`` in the ``run_id`` are based on how 
`CronTriggerTimetable`_ or `CronDataIntervalTimetable`_ handle the data 
interval.
+In these scenarios, the ``logical_date`` in the ``run_id`` are based on how 
how the timetable handles the data
+interval.
 
 See :ref:`dag-catchup` for more information about how DAG runs are triggered 
when using ``catchup``.
 
@@ -248,30 +297,29 @@ See :ref:`dag-catchup` for more information about how DAG 
runs are triggered whe
 The time when a DAG run is triggered
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
-`CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ trigger DAG runs at 
the same time. However, the timestamp for the ``run_id`` is different for each.
-
-- `CronTriggerTimetable`_ has a ``run_id`` timestamp, the ``logical_date``, 
showing when DAG run is able to start.
-- `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ trigger DAG runs at 
the same time. However, the timestamp for the ``run_id`` (``logical_date``) is 
different for each.
+Both trigger and data interval timetables trigger DAG runs at the same time. 
However, the timestamp for the
+``run_id`` is different for each. This is because ``run_id`` is based on 
``logical_date``.
 
 For example, suppose there is a cron expression ``@daily`` or ``0 0 * * *``, 
which is scheduled to run at 12AM every day. If you enable DAGs using the two 
timetables at 3PM on January
 31st,
-- `CronTriggerTimetable`_ triggers a new DAG run at 12AM on February 1st. The 
``run_id`` timestamp is midnight, on February 1st.
-- `CronDataIntervalTimetable`_ immediately triggers a new DAG run, because a 
DAG run for the daily time interval beginning at 12AM on January 31st did not 
occur yet. The ``run_id`` timestamp is midnight, on January 31st, since that is 
the beginning of the data interval.
+- `CronTriggerTimetable`_ creates a new DAG run at 12AM on February 1st. The 
``run_id`` timestamp is midnight, on February 1st.
+- `CronDataIntervalTimetable`_ immediately creates a new DAG run, because a 
DAG run for the daily time interval beginning at 12AM on January 31st did not 
occur yet. The ``run_id`` timestamp is midnight, on January 31st, since that is 
the beginning of the data interval.
 
-This is another example showing the difference in the case of skipping DAG 
runs.
+The following is another example showing the difference in the case of 
skipping DAG runs:
 
 Suppose there are two running DAGs with a cron expression ``@daily`` or ``0 0 
* * *`` that use the two different timetables. If you pause the DAGs at 3PM on 
January 31st and re-enable them at 3PM on February 2nd,
 - `CronTriggerTimetable`_ skips the DAG runs that were supposed to trigger on 
February 1st and 2nd. The next DAG run will be triggered at 12AM on February 
3rd.
 - `CronDataIntervalTimetable`_ skips the DAG runs that were supposed to 
trigger on February 1st only. A DAG run for February 2nd is immediately 
triggered after you re-enable the DAG.
 
-In these examples, you see how `CronTriggerTimetable`_ triggers DAG runs is 
more intuitive and more similar to what
-people expect cron to behave than how `CronDataIntervalTimetable`_ does.
+In these examples, you see how a trigger timetable creates DAG runs more 
intuitively and similar to what
+people expect a workflow to behave, while a data interval timetable is 
designed heavily around the data
+interval it processes, and does not reflect a workflow's own properties.
 
 
 .. _Differences between the cron and delta data interval timetables:
 
-Differences between the cron and delta data interval timetables:
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+Differences between the cron and delta data interval timetables
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 Choosing between `DeltaDataIntervalTimetable`_ and 
`CronDataIntervalTimetable`_ depends on your use case.
 If you enable a DAG at 01:05 on February 1st, the following table summarizes 
the DAG runs created and the
diff --git a/tests/timetables/test_interval_timetable.py 
b/tests/timetables/test_interval_timetable.py
index 90883206551..3c40174d8bf 100644
--- a/tests/timetables/test_interval_timetable.py
+++ b/tests/timetables/test_interval_timetable.py
@@ -181,9 +181,28 @@ def test_validate_failure(timetable: Timetable, 
error_message: str) -> None:
     assert str(ctx.value) == error_message
 
 
-def test_cron_interval_timezone_from_string():
-    timetable = CronDataIntervalTimetable("@hourly", "UTC")
-    assert timetable.serialize()["timezone"] == "UTC"
+def test_cron_interval_serialize():
+    data = HOURLY_CRON_TIMETABLE.serialize()
+    assert data == {"expression": "0 * * * *", "timezone": "UTC"}
+    tt = CronDataIntervalTimetable.deserialize(data)
+    assert isinstance(tt, CronDataIntervalTimetable)
+    assert tt._expression == HOURLY_CRON_TIMETABLE._expression
+    assert tt._timezone == HOURLY_CRON_TIMETABLE._timezone
+
+
[email protected](
+    "timetable, expected_data",
+    [
+        (HOURLY_RELATIVEDELTA_TIMETABLE, {"delta": {"hours": 1}}),
+        (HOURLY_TIMEDELTA_TIMETABLE, {"delta": 3600.0}),
+    ],
+)
+def test_delta_interval_serialize(timetable, expected_data):
+    data = timetable.serialize()
+    assert data == expected_data
+    tt = DeltaDataIntervalTimetable.deserialize(data)
+    assert isinstance(tt, DeltaDataIntervalTimetable)
+    assert tt._delta == timetable._delta
 
 
 @pytest.mark.parametrize(
diff --git a/tests/timetables/test_trigger_timetable.py 
b/tests/timetables/test_trigger_timetable.py
index 5165a14b3c1..a3e4b3fe6ec 100644
--- a/tests/timetables/test_trigger_timetable.py
+++ b/tests/timetables/test_trigger_timetable.py
@@ -25,8 +25,8 @@ import pytest
 import time_machine
 
 from airflow.exceptions import AirflowTimetableInvalid
-from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction
-from airflow.timetables.trigger import CronTriggerTimetable
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
+from airflow.timetables.trigger import CronTriggerTimetable, 
DeltaTriggerTimetable
 from airflow.utils.timezone import utc
 
 START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=utc)
@@ -39,6 +39,9 @@ YESTERDAY = CURRENT_TIME - datetime.timedelta(days=1)
 
 HOURLY_CRON_TRIGGER_TIMETABLE = CronTriggerTimetable("@hourly", timezone=utc)
 
+HOURLY_TIMEDELTA_TIMETABLE = DeltaTriggerTimetable(datetime.timedelta(hours=1))
+HOURLY_RELATIVEDELTA_TIMETABLE = 
DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(hours=1))
+
 DELTA_FROM_MIDNIGHT = datetime.timedelta(minutes=30, hours=16)
 
 
@@ -76,6 +79,47 @@ def 
test_daily_cron_trigger_no_catchup_first_starts_at_next_schedule(
     assert next_info == DagRunInfo.exact(next_start_time)
 
 
[email protected](
+    "last_automated_data_interval, next_start_time",
+    [
+        pytest.param(
+            None,
+            CURRENT_TIME,
+            id="first-run",
+        ),
+        pytest.param(
+            DataInterval.exact(YESTERDAY + DELTA_FROM_MIDNIGHT),
+            CURRENT_TIME + DELTA_FROM_MIDNIGHT,
+            id="before-now",
+        ),
+        pytest.param(
+            DataInterval.exact(CURRENT_TIME + DELTA_FROM_MIDNIGHT),
+            CURRENT_TIME + datetime.timedelta(days=1) + DELTA_FROM_MIDNIGHT,
+            id="after-now",
+        ),
+    ],
+)
[email protected](
+    "timetable",
+    [
+        pytest.param(DeltaTriggerTimetable(datetime.timedelta(days=1)), 
id="timedelta"),
+        
pytest.param(DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(days=1)),
 id="relativedelta"),
+    ],
+)
+@time_machine.travel(CURRENT_TIME)
+def test_daily_delta_trigger_no_catchup_first_starts_at_next_schedule(
+    last_automated_data_interval: DataInterval | None,
+    next_start_time: pendulum.DateTime,
+    timetable: Timetable,
+) -> None:
+    """If ``catchup=False`` and start_date is a day before"""
+    next_info = timetable.next_dagrun_info(
+        last_automated_data_interval=last_automated_data_interval,
+        restriction=TimeRestriction(earliest=YESTERDAY, latest=None, 
catchup=False),
+    )
+    assert next_info == DagRunInfo.exact(next_start_time)
+
+
 @pytest.mark.parametrize(
     "current_time, earliest, expected",
     [
@@ -124,6 +168,62 @@ def test_hourly_cron_trigger_no_catchup_next_info(
     assert next_info == expected
 
 
[email protected](
+    "current_time, earliest, expected",
+    [
+        pytest.param(
+            pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc),
+            START_DATE,
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, 
tzinfo=utc)),
+            id="current_time_on_boundary",
+        ),
+        pytest.param(
+            pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc),
+            START_DATE,
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, 
tzinfo=utc)),
+            id="current_time_not_on_boundary",
+        ),
+        pytest.param(
+            pendulum.DateTime(2022, 7, 27, 1, 0, 0, tzinfo=utc),
+            START_DATE,
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, 
tzinfo=utc)),
+            id="current_time_miss_one_interval_on_boundary",
+        ),
+        pytest.param(
+            pendulum.DateTime(2022, 7, 27, 1, 30, 0, tzinfo=utc),
+            START_DATE,
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 30, 0, 
tzinfo=utc)),
+            id="current_time_miss_one_interval_not_on_boundary",
+        ),
+        pytest.param(
+            pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc),
+            pendulum.DateTime(2199, 12, 31, 22, 30, 0, tzinfo=utc),
+            DagRunInfo.exact(pendulum.DateTime(2199, 12, 31, 22, 30, 0, 
tzinfo=utc)),
+            id="future_start_date",
+        ),
+    ],
+)
[email protected](
+    "timetable",
+    [
+        pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
+        pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
+    ],
+)
+def test_hourly_delta_trigger_no_catchup_next_info(
+    current_time: pendulum.DateTime,
+    earliest: pendulum.DateTime,
+    expected: DagRunInfo,
+    timetable: Timetable,
+) -> None:
+    with time_machine.travel(current_time):
+        next_info = timetable.next_dagrun_info(
+            last_automated_data_interval=PREV_DATA_INTERVAL_EXACT,
+            restriction=TimeRestriction(earliest=earliest, latest=None, 
catchup=False),
+        )
+    assert next_info == expected
+
+
 @pytest.mark.parametrize(
     "last_automated_data_interval, earliest, expected",
     [
@@ -171,6 +271,55 @@ def test_hourly_cron_trigger_catchup_next_info(
     assert next_info == expected
 
 
[email protected](
+    "last_automated_data_interval, earliest, expected",
+    [
+        pytest.param(
+            DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, 
tzinfo=utc)),
+            START_DATE,
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 0, 0, 
tzinfo=utc)),
+            id="last_automated_on_boundary",
+        ),
+        pytest.param(
+            DataInterval.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, 
tzinfo=utc)),
+            START_DATE,
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 1, 30, 0, 
tzinfo=utc)),
+            id="last_automated_not_on_boundary",
+        ),
+        pytest.param(
+            None,
+            pendulum.DateTime(2022, 7, 27, 0, 0, 0, tzinfo=utc),
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 0, 0, 
tzinfo=utc)),
+            id="no_last_automated_with_earliest_on_boundary",
+        ),
+        pytest.param(
+            None,
+            pendulum.DateTime(2022, 7, 27, 0, 30, 0, tzinfo=utc),
+            DagRunInfo.exact(pendulum.DateTime(2022, 7, 27, 0, 30, 0, 
tzinfo=utc)),
+            id="no_last_automated_with_earliest_not_on_boundary",
+        ),
+    ],
+)
[email protected](
+    "timetable",
+    [
+        pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
+        pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
+    ],
+)
+def test_hourly_delta_trigger_catchup_next_info(
+    last_automated_data_interval: DataInterval | None,
+    earliest: pendulum.DateTime | None,
+    expected: DagRunInfo | None,
+    timetable: Timetable,
+) -> None:
+    next_info = timetable.next_dagrun_info(
+        last_automated_data_interval=last_automated_data_interval,
+        restriction=TimeRestriction(earliest=earliest, latest=None, 
catchup=True),
+    )
+    assert next_info == expected
+
+
 def test_cron_trigger_next_info_with_interval():
     # Runs every Monday on 16:30, covering the day before the run.
     timetable = CronTriggerTimetable(
@@ -192,37 +341,73 @@ def test_cron_trigger_next_info_with_interval():
     )
 
 
-def test_validate_success() -> None:
-    HOURLY_CRON_TRIGGER_TIMETABLE.validate()
-
[email protected](
+    "timetable",
+    [
+        pytest.param(HOURLY_CRON_TRIGGER_TIMETABLE, id="cron"),
+        pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
+        pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
+    ],
+)
+def test_validate_success(timetable: Timetable) -> None:
+    timetable.validate()
 
-def test_validate_failure() -> None:
-    timetable = CronTriggerTimetable("0 0 1 13 0", timezone=utc)
 
[email protected](
+    "timetable, message",
+    [
+        pytest.param(
+            CronTriggerTimetable("0 0 1 13 0", timezone=utc),
+            "[0 0 1 13 0] is not acceptable, out of range",
+            id="cron",
+        ),
+        pytest.param(
+            DeltaTriggerTimetable(datetime.timedelta(days=-1)),
+            "schedule interval must be positive, not 
datetime.timedelta(days=-1)",
+            id="timedelta",
+        ),
+        pytest.param(
+            
DeltaTriggerTimetable(dateutil.relativedelta.relativedelta(days=-1)),
+            "schedule interval must be positive, not relativedelta(days=-1)",
+            id="relativedelta",
+        ),
+    ],
+)
+def test_validate_failure(timetable: Timetable, message: str) -> None:
     with pytest.raises(AirflowTimetableInvalid) as ctx:
         timetable.validate()
-    assert str(ctx.value) == "[0 0 1 13 0] is not acceptable, out of range"
+    assert str(ctx.value) == message
 
 
 @pytest.mark.parametrize(
     "timetable, data",
     [
-        (HOURLY_CRON_TRIGGER_TIMETABLE, {"expression": "0 * * * *", 
"timezone": "UTC", "interval": 0}),
-        (
+        pytest.param(
+            HOURLY_CRON_TRIGGER_TIMETABLE,
+            {"expression": "0 * * * *", "timezone": "UTC", "interval": 0.0},
+            id="hourly",
+        ),
+        pytest.param(
             CronTriggerTimetable("0 0 1 12 *", timezone=utc, 
interval=datetime.timedelta(hours=2)),
             {"expression": "0 0 1 12 *", "timezone": "UTC", "interval": 
7200.0},
+            id="interval",
         ),
-        (
+        pytest.param(
             CronTriggerTimetable(
                 "0 0 1 12 0",
                 timezone="Asia/Taipei",
                 
interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO),
             ),
-            {"expression": "0 0 1 12 0", "timezone": "Asia/Taipei", 
"interval": {"weekday": [0]}},
+            {
+                "expression": "0 0 1 12 0",
+                "timezone": "Asia/Taipei",
+                "interval": {"weekday": [0]},
+            },
+            id="non-utc-interval",
         ),
     ],
 )
-def test_serialization(timetable: CronTriggerTimetable, data: dict[str, 
typing.Any]) -> None:
+def test_cron_trigger_serialization(timetable: CronTriggerTimetable, data: 
dict[str, typing.Any]) -> None:
     assert timetable.serialize() == data
 
     tt = CronTriggerTimetable.deserialize(data)
@@ -230,3 +415,43 @@ def test_serialization(timetable: CronTriggerTimetable, 
data: dict[str, typing.A
     assert tt._expression == timetable._expression
     assert tt._timezone == timetable._timezone
     assert tt._interval == timetable._interval
+
+
[email protected](
+    "timetable, data",
+    [
+        pytest.param(
+            HOURLY_TIMEDELTA_TIMETABLE,
+            {"delta": 3600.0, "interval": 0.0},
+            id="timedelta",
+        ),
+        pytest.param(
+            DeltaTriggerTimetable(
+                datetime.timedelta(hours=3),
+                
interval=dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO),
+            ),
+            {"delta": 10800.0, "interval": {"weekday": [0]}},
+            id="timedelta-interval",
+        ),
+        pytest.param(
+            HOURLY_RELATIVEDELTA_TIMETABLE,
+            {"delta": {"hours": 1}, "interval": 0.0},
+            id="relativedelta",
+        ),
+        pytest.param(
+            DeltaTriggerTimetable(
+                
dateutil.relativedelta.relativedelta(weekday=dateutil.relativedelta.MO),
+                interval=datetime.timedelta(days=7),
+            ),
+            {"delta": {"weekday": [0]}, "interval": 604800.0},
+            id="relativedelta-interval",
+        ),
+    ],
+)
+def test_delta_trigger_serialization(timetable: DeltaTriggerTimetable, data: 
dict[str, typing.Any]) -> None:
+    assert timetable.serialize() == data
+
+    tt = DeltaTriggerTimetable.deserialize(data)
+    assert isinstance(tt, DeltaTriggerTimetable)
+    assert tt._delta == timetable._delta
+    assert tt._interval == timetable._interval


Reply via email to