This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 582e0d53af Events Timetable (#22332)
582e0d53af is described below
commit 582e0d53af78f881cc0f9e5b063bef11f18f7999
Author: Collin McNulty <[email protected]>
AuthorDate: Fri Apr 8 15:38:23 2022 -0500
Events Timetable (#22332)
This Timetable will be widely useful for timing based on sporting events,
planned communication campaigns,
and other schedules that are arbitrary and irregular but predictable.
---
airflow/timetables/events.py | 114 +++++++++++++++++++++++++++
tests/timetables/test_events_timetable.py | 124 ++++++++++++++++++++++++++++++
2 files changed, 238 insertions(+)
diff --git a/airflow/timetables/events.py b/airflow/timetables/events.py
new file mode 100644
index 0000000000..8024045f01
--- /dev/null
+++ b/airflow/timetables/events.py
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+from typing import Iterable, Optional
+
+import pendulum
+from pendulum import DateTime
+
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
+
+
+class EventsTimetable(Timetable):
+ """
+ Timetable that schedules DAG runs at specific listed datetimes. Suitable
for
+ predictable but truly irregular scheduling such as sporting events.
+
+ :param event_dates: List of datetimes for the DAG to run at. Duplicates
will be ignored. Must be finite
+ and of reasonable size as it will be loaded in its
entirety.
+ :param restrict_to_events: Whether manual runs should use the most recent
event or
+ the current time
+ :param presorted: if True, event_dates will be assumed to be in ascending
order. Provides modest
+ performance improvement for larger lists of event_dates.
+ :param description: A name for the timetable to display in the UI. Default
None will be shown as
+ "X Events" where X is the len of event_dates
+ """
+
+ def __init__(
+ self,
+ event_dates: Iterable[DateTime],
+ restrict_to_events: bool = False,
+ presorted: bool = False,
+ description: Optional[str] = None,
+ ):
+
+ self.event_dates = list(event_dates) # Must be reversible and
indexable
+ if not presorted:
+ # For long lists this could take a while, so only want to do it
once
+ self.event_dates = sorted(self.event_dates)
+ self.restrict_to_events = restrict_to_events
+ if description is None:
+ self.description = (
+ f"{len(self.event_dates)} Events between {self.event_dates[0]}
and {self.event_dates[-1]}"
+ )
+ self._summary = f"{len(self.event_dates)} Events"
+ else:
+ self._summary = description
+ self.description = description
+
+ @property
+ def summary(self) -> str:
+ return self._summary
+
+ def __repr__(self):
+ return self.summary
+
+ def next_dagrun_info(
+ self,
+ *,
+ last_automated_data_interval: Optional[DataInterval],
+ restriction: TimeRestriction,
+ ) -> Optional[DagRunInfo]:
+ if last_automated_data_interval is None:
+ next_event = self.event_dates[0]
+ else:
+ future_dates = itertools.dropwhile(
+ lambda when: when <= last_automated_data_interval.end,
self.event_dates # type: ignore
+ )
+ next_event = next(future_dates, None) # type: ignore
+ if next_event is None:
+ return None
+
+ return DagRunInfo.exact(next_event)
+
+ def infer_manual_data_interval(self, *, run_after: DateTime) ->
DataInterval:
+ # If Timetable not restricted to events, run for the time specified
+ if not self.restrict_to_events:
+ return DataInterval.exact(run_after)
+
+ # If restricted to events, run for the most recent past event
+ # or for the first event if all events are in the future
+ if run_after < self.event_dates[0]:
+ return DataInterval.exact(self.event_dates[0])
+ else:
+ past_events = itertools.dropwhile(lambda when: when > run_after,
self.event_dates[::-1])
+ most_recent_event = next(past_events)
+ return DataInterval.exact(most_recent_event)
+
+ def serialize(self):
+ return {
+ "event_dates": [str(x) for x in self.event_dates],
+ "restrict_to_events": self.restrict_to_events,
+ }
+
+ @classmethod
+ def deserialize(cls, data) -> Timetable:
+ return cls(
+ [pendulum.DateTime.fromisoformat(x) for x in data["event_dates"]],
+ data["restrict_to_events"],
+ presorted=True,
+ )
diff --git a/tests/timetables/test_events_timetable.py
b/tests/timetables/test_events_timetable.py
new file mode 100644
index 0000000000..f86a321220
--- /dev/null
+++ b/tests/timetables/test_events_timetable.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import pendulum
+import pytest
+
+from airflow.settings import TIMEZONE
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
+from airflow.timetables.events import EventsTimetable
+
+START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) # Precedes all
events
+
+EVENT_DATES = [
+ pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE),
+ pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE),
+ pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE),
+ pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE), # deliberate duplicate,
should be ignored
+ pendulum.DateTime(2021, 10, 9, tzinfo=TIMEZONE), # deliberately out of
order
+ pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE),
+]
+
+EVENT_DATES_SORTED = [
+ pendulum.DateTime(2021, 9, 6, tzinfo=TIMEZONE),
+ pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE),
+ pendulum.DateTime(2021, 9, 8, tzinfo=TIMEZONE),
+ pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE),
+ pendulum.DateTime(2021, 10, 9, tzinfo=TIMEZONE),
+]
+
+NON_EVENT_DATE = pendulum.DateTime(2021, 10, 1, tzinfo=TIMEZONE)
+MOST_RECENT_EVENT = pendulum.DateTime(2021, 9, 10, tzinfo=TIMEZONE)
+
+
[email protected]()
+def restriction():
+ return TimeRestriction(earliest=START_DATE, latest=None, catchup=True)
+
+
[email protected]()
+def unrestricted_timetable():
+ return EventsTimetable(event_dates=EVENT_DATES)
+
+
[email protected]()
+def restricted_timetable():
+ return EventsTimetable(event_dates=EVENT_DATES, restrict_to_events=True)
+
+
[email protected](
+ "start, end",
+ list(zip(EVENT_DATES, EVENT_DATES)),
+)
+def test_dag_run_info_interval(start: pendulum.DateTime, end:
pendulum.DateTime):
+ expected_info = DagRunInfo(run_after=end,
data_interval=DataInterval(start, end))
+ assert DagRunInfo.interval(start, end) == expected_info
+
+
+def test_manual_with_unrestricted(unrestricted_timetable: Timetable,
restriction: TimeRestriction):
+ """When not using strict event dates, manual runs have run_after as the
data interval"""
+ manual_run_data_interval =
unrestricted_timetable.infer_manual_data_interval(run_after=NON_EVENT_DATE)
+ expected_data_interval = DataInterval.exact(NON_EVENT_DATE)
+ assert expected_data_interval == manual_run_data_interval
+
+
+def test_manual_with_restricted_middle(restricted_timetable: Timetable,
restriction: TimeRestriction):
+ """
+ Test that when using strict event dates, manual runs after the first event
have the
+ most recent event's date as the start interval
+ """
+ manual_run_data_interval =
restricted_timetable.infer_manual_data_interval(run_after=NON_EVENT_DATE)
+ expected_data_interval = DataInterval.exact(MOST_RECENT_EVENT)
+ assert expected_data_interval == manual_run_data_interval
+
+
+def test_manual_with_restricted_before(restricted_timetable: Timetable,
restriction: TimeRestriction):
+ """
+ Test that when using strict event dates, manual runs before the first
event have the first event's date
+ as the start interval
+ """
+ manual_run_data_interval =
restricted_timetable.infer_manual_data_interval(run_after=START_DATE)
+ expected_data_interval = DataInterval.exact(EVENT_DATES[0])
+ assert expected_data_interval == manual_run_data_interval
+
+
[email protected](
+ "last_automated_data_interval, expected_next_info",
+ [
+ pytest.param(
+ DataInterval(day, day),
+ DagRunInfo.interval(
+ EVENT_DATES_SORTED[event_num + 1],
+ EVENT_DATES_SORTED[event_num + 1],
+ ),
+ )
+ for event_num, day in enumerate(EVENT_DATES_SORTED[:-1])
+ ]
+ + [pytest.param(DataInterval(EVENT_DATES_SORTED[-1],
EVENT_DATES_SORTED[-1]), None)],
+)
+def test_subsequent_weekday_schedule(
+ unrestricted_timetable: Timetable,
+ restriction: TimeRestriction,
+ last_automated_data_interval: DataInterval,
+ expected_next_info: DagRunInfo,
+):
+ """The next four subsequent runs cover the next four weekdays each."""
+ next_info = unrestricted_timetable.next_dagrun_info(
+ last_automated_data_interval=last_automated_data_interval,
+ restriction=restriction,
+ )
+ assert next_info == expected_next_info