This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 582e0d53af Events Timetable (#22332)
582e0d53af is described below

commit 582e0d53af78f881cc0f9e5b063bef11f18f7999
Author: Collin McNulty <[email protected]>
AuthorDate: Fri Apr 8 15:38:23 2022 -0500

    Events Timetable (#22332)
    
    This Timetable will be widely useful for timing based on sporting events, 
planned communication campaigns,
    and other schedules that are arbitrary and irregular but predictable.
---
 airflow/timetables/events.py              | 114 +++++++++++++++++++++++++++
 tests/timetables/test_events_timetable.py | 124 ++++++++++++++++++++++++++++++
 2 files changed, 238 insertions(+)

diff --git a/airflow/timetables/events.py b/airflow/timetables/events.py
new file mode 100644
index 0000000000..8024045f01
--- /dev/null
+++ b/airflow/timetables/events.py
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+from typing import Iterable, Optional
+
+import pendulum
+from pendulum import DateTime
+
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
+
+
+class EventsTimetable(Timetable):
+    """
+    Timetable that schedules DAG runs at specific listed datetimes. Suitable 
for
+    predictable but truly irregular scheduling such as sporting events.
+
+    :param event_dates: List of datetimes for the DAG to run at. Duplicates 
will be ignored. Must be finite
+                        and of reasonable size as it will be loaded in its 
entirety.
+    :param restrict_to_events: Whether manual runs should use the most recent 
event or
+        the current time
+    :param presorted: if True, event_dates will be assumed to be in ascending 
order. Provides modest
+        performance improvement for larger lists of event_dates.
+    :param description: A name for the timetable to display in the UI. Default 
None will be shown as
+                        "X Events" where X is the len of event_dates
+    """
+
+    def __init__(
+        self,
+        event_dates: Iterable[DateTime],
+        restrict_to_events: bool = False,
+        presorted: bool = False,
+        description: Optional[str] = None,
+    ):
+
+        self.event_dates = list(event_dates)  # Must be reversible and 
indexable
+        if not presorted:
+            # For long lists this could take a while, so only want to do it 
once
+            self.event_dates = sorted(self.event_dates)
+        self.restrict_to_events = restrict_to_events
+        if description is None:
+            self.description = (
+                f"{len(self.event_dates)} Events between {self.event_dates[0]} 
and {self.event_dates[-1]}"
+            )
+            self._summary = f"{len(self.event_dates)} Events"
+        else:
+            self._summary = description
+            self.description = description
+
+    @property
+    def summary(self) -> str:
+        return self._summary
+
+    def __repr__(self):
+        return self.summary
+
+    def next_dagrun_info(
+        self,
+        *,
+        last_automated_data_interval: Optional[DataInterval],
+        restriction: TimeRestriction,
+    ) -> Optional[DagRunInfo]:
+        if last_automated_data_interval is None:
+            next_event = self.event_dates[0]
+        else:
+            future_dates = itertools.dropwhile(
+                lambda when: when <= last_automated_data_interval.end, 
self.event_dates  # type: ignore
+            )
+            next_event = next(future_dates, None)  # type: ignore
+            if next_event is None:
+                return None
+
+        return DagRunInfo.exact(next_event)
+
+    def infer_manual_data_interval(self, *, run_after: DateTime) -> 
DataInterval:
+        # If Timetable not restricted to events, run for the time specified
+        if not self.restrict_to_events:
+            return DataInterval.exact(run_after)
+
+        # If restricted to events, run for the most recent past event
+        # or for the first event if all events are in the future
+        if run_after < self.event_dates[0]:
+            return DataInterval.exact(self.event_dates[0])
+        else:
+            past_events = itertools.dropwhile(lambda when: when > run_after, 
self.event_dates[::-1])
+            most_recent_event = next(past_events)
+            return DataInterval.exact(most_recent_event)
+
+    def serialize(self):
+        return {
+            "event_dates": [str(x) for x in self.event_dates],
+            "restrict_to_events": self.restrict_to_events,
+        }
+
+    @classmethod
+    def deserialize(cls, data) -> Timetable:
+        return cls(
+            [pendulum.DateTime.fromisoformat(x) for x in data["event_dates"]],
+            data["restrict_to_events"],
+            presorted=True,
+        )
diff --git a/tests/timetables/test_events_timetable.py 
b/tests/timetables/test_events_timetable.py
new file mode 100644
index 0000000000..f86a321220
--- /dev/null
+++ b/tests/timetables/test_events_timetable.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pendulum
+import pytest
+
+from airflow.settings import TIMEZONE
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
+from airflow.timetables.events import EventsTimetable
+
+START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE)  # Precedes all 
events
+
+EVENT_DATES = [
+    pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE),
+    pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE),
+    pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE),
+    pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE),  # deliberate duplicate, 
should be ignored
+    pendulum.DateTime(2021, 10, 9, tzinfo=TIMEZONE),  # deliberately out of 
order
+    pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE),
+]
+
+EVENT_DATES_SORTED = [
+    pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE),
+    pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE),
+    pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE),
+    pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE),
+    pendulum.DateTime(2021, 10, 9, tzinfo=TIMEZONE),
+]
+
+NON_EVENT_DATE = pendulum.DateTime(2021, 10, 1, tzinfo=TIMEZONE)
+MOST_RECENT_EVENT = pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE)
+
+
[email protected]()
+def restriction():
+    return TimeRestriction(earliest=START_DATE, latest=None, catchup=True)
+
+
[email protected]()
+def unrestricted_timetable():
+    return EventsTimetable(event_dates=EVENT_DATES)
+
+
[email protected]()
+def restricted_timetable():
+    return EventsTimetable(event_dates=EVENT_DATES, restrict_to_events=True)
+
+
[email protected](
+    "start, end",
+    list(zip(EVENT_DATES, EVENT_DATES)),
+)
+def test_dag_run_info_interval(start: pendulum.DateTime, end: 
pendulum.DateTime):
+    expected_info = DagRunInfo(run_after=end, 
data_interval=DataInterval(start, end))
+    assert DagRunInfo.interval(start, end) == expected_info
+
+
+def test_manual_with_unrestricted(unrestricted_timetable: Timetable, 
restriction: TimeRestriction):
+    """When not using strict event dates, manual runs have run_after as the 
data interval"""
+    manual_run_data_interval = 
unrestricted_timetable.infer_manual_data_interval(run_after=NON_EVENT_DATE)
+    expected_data_interval = DataInterval.exact(NON_EVENT_DATE)
+    assert expected_data_interval == manual_run_data_interval
+
+
+def test_manual_with_restricted_middle(restricted_timetable: Timetable, 
restriction: TimeRestriction):
+    """
+    Test that when using strict event dates, manual runs after the first event 
have the
+    most recent event's date as the start interval
+    """
+    manual_run_data_interval = 
restricted_timetable.infer_manual_data_interval(run_after=NON_EVENT_DATE)
+    expected_data_interval = DataInterval.exact(MOST_RECENT_EVENT)
+    assert expected_data_interval == manual_run_data_interval
+
+
+def test_manual_with_restricted_before(restricted_timetable: Timetable, 
restriction: TimeRestriction):
+    """
+    Test that when using strict event dates, manual runs before the first 
event have the first event's date
+    as the start interval
+    """
+    manual_run_data_interval = 
restricted_timetable.infer_manual_data_interval(run_after=START_DATE)
+    expected_data_interval = DataInterval.exact(EVENT_DATES[0])
+    assert expected_data_interval == manual_run_data_interval
+
+
[email protected](
+    "last_automated_data_interval, expected_next_info",
+    [
+        pytest.param(
+            DataInterval(day, day),
+            DagRunInfo.interval(
+                EVENT_DATES_SORTED[event_num + 1],
+                EVENT_DATES_SORTED[event_num + 1],
+            ),
+        )
+        for event_num, day in enumerate(EVENT_DATES_SORTED[:-1])
+    ]
+    + [pytest.param(DataInterval(EVENT_DATES_SORTED[-1], 
EVENT_DATES_SORTED[-1]), None)],
+)
+def test_subsequent_weekday_schedule(
+    unrestricted_timetable: Timetable,
+    restriction: TimeRestriction,
+    last_automated_data_interval: DataInterval,
+    expected_next_info: DagRunInfo,
+):
+    """The next four subsequent runs cover the next four weekdays each."""
+    next_info = unrestricted_timetable.next_dagrun_info(
+        last_automated_data_interval=last_automated_data_interval,
+        restriction=restriction,
+    )
+    assert next_info == expected_next_info

Reply via email to