This is an automated email from the ASF dual-hosted git repository. uranusjr pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 954bb60e87 Straighten typing in workday timetable (#36296) 954bb60e87 is described below commit 954bb60e876b7cbb491ec7542ecdbb6bb9b8ab03 Author: Tzu-ping Chung <uranu...@gmail.com> AuthorDate: Tue Dec 19 16:55:58 2023 +0800 Straighten typing in workday timetable (#36296) --- airflow/example_dags/plugins/workday.py | 29 +++++------ tests/plugins/workday.py | 90 +++------------------------------ 2 files changed, 20 insertions(+), 99 deletions(-) diff --git a/airflow/example_dags/plugins/workday.py b/airflow/example_dags/plugins/workday.py index 848ef14051..e78367e96c 100644 --- a/airflow/example_dags/plugins/workday.py +++ b/airflow/example_dags/plugins/workday.py @@ -73,23 +73,20 @@ class AfterWorkdayTimetable(Timetable): ) -> DagRunInfo | None: if last_automated_data_interval is not None: # There was a previous run on the regular schedule. last_start = last_automated_data_interval.start - next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min).replace( - tzinfo=UTC - ) - else: # This is the first ever run on the regular schedule. - next_start = restriction.earliest - if next_start is None: # No start_date. Don't schedule. - return None - if not restriction.catchup: - # If the DAG has catchup=False, today is the earliest to consider. - next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) - elif next_start.time() != Time.min: - # If earliest does not fall on midnight, skip to the next day. - next_start = DateTime.combine(next_start.date() + timedelta(days=1), Time.min).replace( - tzinfo=UTC - ) + next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min) + # Otherwise this is the first ever run on the regular schedule... + elif (earliest := restriction.earliest) is None: + return None # No start_date. Don't schedule. + elif not restriction.catchup: + # If the DAG has catchup=False, today is the earliest to consider. + next_start = max(earliest, DateTime.combine(Date.today(), Time.min)) + elif earliest.time() != Time.min: + # If earliest does not fall on midnight, skip to the next day. + next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min) + else: + next_start = earliest # Skip weekends and holidays - next_start = self.get_next_workday(next_start) + next_start = self.get_next_workday(next_start.replace(tzinfo=UTC)) if restriction.latest is not None and next_start > restriction.latest: return None # Over the DAG's scheduled end; don't schedule. diff --git a/tests/plugins/workday.py b/tests/plugins/workday.py index 72202fcc5c..a73cf12a55 100644 --- a/tests/plugins/workday.py +++ b/tests/plugins/workday.py @@ -15,89 +15,13 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Plugin to demonstrate timetable registration and accommodate example DAGs.""" -from __future__ import annotations - -import logging -from datetime import timedelta - -# [START howto_timetable] -from pendulum import UTC, Date, DateTime, Time - -from airflow.plugins_manager import AirflowPlugin -from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable - -log = logging.getLogger(__name__) -try: - from pandas.tseries.holiday import USFederalHolidayCalendar - - holiday_calendar = USFederalHolidayCalendar() -except ImportError: - log.warning("Could not import pandas. Holidays will not be considered.") - holiday_calendar = None # type: ignore[assignment] - - -class AfterWorkdayTimetable(Timetable): - def get_next_workday(self, d: DateTime, incr=1) -> DateTime: - next_start = d - while True: - if next_start.weekday() in (5, 6): # If next start is in the weekend go to next day - next_start = next_start + incr * timedelta(days=1) - continue - if holiday_calendar is not None: - holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime() - if next_start in holidays: # If next start is a holiday go to next day - next_start = next_start + incr * timedelta(days=1) - continue - break - return next_start +"""Plugin to demonstrate timetable registration and accommodate example DAGs. - # [START howto_timetable_infer_manual_data_interval] - def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval: - start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC) - # Skip backwards over weekends and holidays to find last run - start = self.get_next_workday(start, incr=-1) - return DataInterval(start=start, end=(start + timedelta(days=1))) - - # [END howto_timetable_infer_manual_data_interval] - - # [START howto_timetable_next_dagrun_info] - def next_dagrun_info( - self, - *, - last_automated_data_interval: DataInterval | None, - restriction: TimeRestriction, - ) -> DagRunInfo | None: - if last_automated_data_interval is not None: # There was a previous run on the regular schedule. - last_start = last_automated_data_interval.start - next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min).replace( - tzinfo=UTC - ) - else: # This is the first ever run on the regular schedule. - next_start = restriction.earliest - if next_start is None: # No start_date. Don't schedule. - return None - if not restriction.catchup: - # If the DAG has catchup=False, today is the earliest to consider. - next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) - elif next_start.time() != Time.min: - # If earliest does not fall on midnight, skip to the next day. - next_start = DateTime.combine(next_start.date() + timedelta(days=1), Time.min).replace( - tzinfo=UTC - ) - # Skip weekends and holidays - next_start = self.get_next_workday(next_start) - - if restriction.latest is not None and next_start > restriction.latest: - return None # Over the DAG's scheduled end; don't schedule. - return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1))) - - # [END howto_timetable_next_dagrun_info] - - -class WorkdayTimetablePlugin(AirflowPlugin): - name = "workday_timetable_plugin" - timetables = [AfterWorkdayTimetable] +This simply forwards the timetable from ``airflow.example_dags``, so we can make +it discoverable to unit tests without exposing the entire subpackage. +""" +from __future__ import annotations +from airflow.example_dags.plugins.workday import AfterWorkdayTimetable, WorkdayTimetablePlugin -# [END howto_timetable] +__all__ = ["AfterWorkdayTimetable", "WorkdayTimetablePlugin"]