This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 954bb60e87 Straighten typing in workday timetable (#36296)
954bb60e87 is described below
commit 954bb60e876b7cbb491ec7542ecdbb6bb9b8ab03
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Dec 19 16:55:58 2023 +0800
Straighten typing in workday timetable (#36296)
---
airflow/example_dags/plugins/workday.py | 29 +++++------
tests/plugins/workday.py | 90 +++------------------------------
2 files changed, 20 insertions(+), 99 deletions(-)
diff --git a/airflow/example_dags/plugins/workday.py
b/airflow/example_dags/plugins/workday.py
index 848ef14051..e78367e96c 100644
--- a/airflow/example_dags/plugins/workday.py
+++ b/airflow/example_dags/plugins/workday.py
@@ -73,23 +73,20 @@ class AfterWorkdayTimetable(Timetable):
) -> DagRunInfo | None:
if last_automated_data_interval is not None: # There was a previous
run on the regular schedule.
last_start = last_automated_data_interval.start
- next_start = DateTime.combine((last_start +
timedelta(days=1)).date(), Time.min).replace(
- tzinfo=UTC
- )
- else: # This is the first ever run on the regular schedule.
- next_start = restriction.earliest
- if next_start is None: # No start_date. Don't schedule.
- return None
- if not restriction.catchup:
- # If the DAG has catchup=False, today is the earliest to
consider.
- next_start = max(next_start, DateTime.combine(Date.today(),
Time.min).replace(tzinfo=UTC))
- elif next_start.time() != Time.min:
- # If earliest does not fall on midnight, skip to the next day.
- next_start = DateTime.combine(next_start.date() +
timedelta(days=1), Time.min).replace(
- tzinfo=UTC
- )
+ next_start = DateTime.combine((last_start +
timedelta(days=1)).date(), Time.min)
+ # Otherwise this is the first ever run on the regular schedule...
+ elif (earliest := restriction.earliest) is None:
+ return None # No start_date. Don't schedule.
+ elif not restriction.catchup:
+ # If the DAG has catchup=False, today is the earliest to consider.
+ next_start = max(earliest, DateTime.combine(Date.today(),
Time.min))
+ elif earliest.time() != Time.min:
+ # If earliest does not fall on midnight, skip to the next day.
+ next_start = DateTime.combine(earliest.date() + timedelta(days=1),
Time.min)
+ else:
+ next_start = earliest
# Skip weekends and holidays
- next_start = self.get_next_workday(next_start)
+ next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
diff --git a/tests/plugins/workday.py b/tests/plugins/workday.py
index 72202fcc5c..a73cf12a55 100644
--- a/tests/plugins/workday.py
+++ b/tests/plugins/workday.py
@@ -15,89 +15,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-"""Plugin to demonstrate timetable registration and accommodate example
DAGs."""
-from __future__ import annotations
-
-import logging
-from datetime import timedelta
-
-# [START howto_timetable]
-from pendulum import UTC, Date, DateTime, Time
-
-from airflow.plugins_manager import AirflowPlugin
-from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction,
Timetable
-
-log = logging.getLogger(__name__)
-try:
- from pandas.tseries.holiday import USFederalHolidayCalendar
-
- holiday_calendar = USFederalHolidayCalendar()
-except ImportError:
- log.warning("Could not import pandas. Holidays will not be considered.")
- holiday_calendar = None # type: ignore[assignment]
-
-
-class AfterWorkdayTimetable(Timetable):
- def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
- next_start = d
- while True:
- if next_start.weekday() in (5, 6): # If next start is in the
weekend go to next day
- next_start = next_start + incr * timedelta(days=1)
- continue
- if holiday_calendar is not None:
- holidays = holiday_calendar.holidays(start=next_start,
end=next_start).to_pydatetime()
- if next_start in holidays: # If next start is a holiday go to
next day
- next_start = next_start + incr * timedelta(days=1)
- continue
- break
- return next_start
+"""Plugin to demonstrate timetable registration and accommodate example DAGs.
- # [START howto_timetable_infer_manual_data_interval]
- def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
- start = DateTime.combine((run_after - timedelta(days=1)).date(),
Time.min).replace(tzinfo=UTC)
- # Skip backwards over weekends and holidays to find last run
- start = self.get_next_workday(start, incr=-1)
- return DataInterval(start=start, end=(start + timedelta(days=1)))
-
- # [END howto_timetable_infer_manual_data_interval]
-
- # [START howto_timetable_next_dagrun_info]
- def next_dagrun_info(
- self,
- *,
- last_automated_data_interval: DataInterval | None,
- restriction: TimeRestriction,
- ) -> DagRunInfo | None:
- if last_automated_data_interval is not None: # There was a previous
run on the regular schedule.
- last_start = last_automated_data_interval.start
- next_start = DateTime.combine((last_start +
timedelta(days=1)).date(), Time.min).replace(
- tzinfo=UTC
- )
- else: # This is the first ever run on the regular schedule.
- next_start = restriction.earliest
- if next_start is None: # No start_date. Don't schedule.
- return None
- if not restriction.catchup:
- # If the DAG has catchup=False, today is the earliest to
consider.
- next_start = max(next_start, DateTime.combine(Date.today(),
Time.min).replace(tzinfo=UTC))
- elif next_start.time() != Time.min:
- # If earliest does not fall on midnight, skip to the next day.
- next_start = DateTime.combine(next_start.date() +
timedelta(days=1), Time.min).replace(
- tzinfo=UTC
- )
- # Skip weekends and holidays
- next_start = self.get_next_workday(next_start)
-
- if restriction.latest is not None and next_start > restriction.latest:
- return None # Over the DAG's scheduled end; don't schedule.
- return DagRunInfo.interval(start=next_start, end=(next_start +
timedelta(days=1)))
-
- # [END howto_timetable_next_dagrun_info]
-
-
-class WorkdayTimetablePlugin(AirflowPlugin):
- name = "workday_timetable_plugin"
- timetables = [AfterWorkdayTimetable]
+This simply forwards the timetable from ``airflow.example_dags``, so we can
make
+it discoverable to unit tests without exposing the entire subpackage.
+"""
+from __future__ import annotations
+from airflow.example_dags.plugins.workday import AfterWorkdayTimetable,
WorkdayTimetablePlugin
-# [END howto_timetable]
+__all__ = ["AfterWorkdayTimetable", "WorkdayTimetablePlugin"]