This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 04a5de3bb1f07341c3d099daca3ef7fb2a4a5502 Author: Tzu-ping Chung <[email protected]> AuthorDate: Fri Oct 22 15:38:57 2021 +0800 Add test for interval timetable catchup=False (#19145) (cherry picked from commit 205219c522b77abe9d36d51807c32189c900cfd4) --- airflow/timetables/interval.py | 24 +++++----- tests/jobs/test_scheduler_job.py | 48 +++++++++++++++++++ tests/timetables/test_interval_timetable.py | 74 +++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 12 deletions(-) diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index 0dd7ffd..a095565 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -74,22 +74,22 @@ class _DataIntervalTimetable(Timetable): earliest = restriction.earliest if not restriction.catchup: earliest = self._skip_to_latest(earliest) + elif earliest is not None: + earliest = self._align(earliest) if last_automated_data_interval is None: # First run; schedule the run at the first available time matching # the schedule, and retrospectively create a data interval for it. if earliest is None: return None - start = self._align(earliest) - else: - # There's a previous run. + start = earliest + else: # There's a previous run. if earliest is not None: # Catchup is False or DAG has new start date in the future. - # Make sure we get the latest start date + # Make sure we get the later one. start = max(last_automated_data_interval.end, earliest) else: - # Create a data interval starting from when the end of the previous interval. + # Data interval starts from the end of the previous interval. start = last_automated_data_interval.end - if restriction.latest is not None and start > restriction.latest: return None end = self._get_next(start) @@ -189,8 +189,8 @@ class CronDataIntervalTimetable(_DataIntervalTimetable): def _align(self, current: DateTime) -> DateTime: """Get the next scheduled time. - This is ``current + interval``, unless ``current`` is first interval, - then ``current`` is returned. + This is ``current + interval``, unless ``current`` falls right on the + interval boundary, when ``current`` is returned. """ next_time = self._get_next(current) if self._get_prev(next_time) != current: @@ -205,14 +205,14 @@ class CronDataIntervalTimetable(_DataIntervalTimetable): This is slightly different from the delta version at terminal values. If the next schedule should start *right now*, we want the data interval - that start right now now, not the one that ends now. + that start now, not the one that ends now. """ current_time = DateTime.utcnow() - next_start = self._get_next(current_time) last_start = self._get_prev(current_time) - if next_start == current_time: + next_start = self._get_next(last_start) + if next_start == current_time: # Current time is on interval boundary. new_start = last_start - elif next_start > current_time: + elif next_start > current_time: # Current time is between boundaries. new_start = self._get_prev(last_start) else: raise AssertionError("next schedule shouldn't be earlier") diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 16b2921..4922617 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -3578,3 +3578,51 @@ class TestSchedulerJobQueriesCount: assert start_date is None assert end_date is None assert duration is None + + def test_catchup_works_correctly(self, dag_maker): + """Test that catchup works correctly""" + session = settings.Session() + with dag_maker( + dag_id='test_catchup_schedule_dag', + schedule_interval=timedelta(days=1), + start_date=DEFAULT_DATE, + catchup=True, + max_active_runs=1, + session=session, + ) as dag: + DummyOperator(task_id='dummy') + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.executor = MockExecutor() + self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) + + self.scheduler_job._create_dag_runs([dag_maker.dag_model], session) + self.scheduler_job._start_queued_dagruns(session) + # first dagrun execution date is DEFAULT_DATE 2016-01-01T00:00:00+00:00 + dr = DagRun.find(execution_date=DEFAULT_DATE, session=session)[0] + ti = dr.get_task_instance(task_id='dummy') + ti.state = State.SUCCESS + session.merge(ti) + session.flush() + + self.scheduler_job._schedule_dag_run(dr, session) + session.flush() + + # Run the second time so _update_dag_next_dagrun will run + self.scheduler_job._schedule_dag_run(dr, session) + session.flush() + + dag.catchup = False + dag.sync_to_db() + assert not dag.catchup + + dm = DagModel.get_dagmodel(dag.dag_id) + self.scheduler_job._create_dag_runs([dm], session) + + # Check catchup worked correctly by ensuring execution_date is quite new + # Our dag is a daily dag + assert ( + session.query(DagRun.execution_date) + .filter(DagRun.execution_date != DEFAULT_DATE) # exclude the first run + .scalar() + ) > (timezone.utcnow() - timedelta(days=2)) diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py new file mode 100644 index 0000000..53f5aeb --- /dev/null +++ b/tests/timetables/test_interval_timetable.py @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import datetime +from typing import Optional + +import freezegun +import pendulum +import pytest + +from airflow.settings import TIMEZONE +from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable +from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable + +START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE) + +PREV_DATA_INTERVAL_START = START_DATE +PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1) +PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, end=PREV_DATA_INTERVAL_END) + +CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE) + +HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE) +HOURLY_DELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1)) + + [email protected]( + "timetable", + [pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), pytest.param(HOURLY_DELTA_TIMETABLE, id="delta")], +) [email protected]( + "last_automated_data_interval", + [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, id="subsequent")], +) [email protected]_time(CURRENT_TIME) +def test_no_catchup_next_info_starts_at_current_time( + timetable: Timetable, + last_automated_data_interval: Optional[DataInterval], +) -> None: + """If ``catchup=False``, the next data interval ends at the current time.""" + next_info = timetable.next_dagrun_info( + last_automated_data_interval=last_automated_data_interval, + restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=False), + ) + expected_start = CURRENT_TIME - datetime.timedelta(hours=1) + assert next_info == DagRunInfo.interval(start=expected_start, end=CURRENT_TIME) + + [email protected]( + "timetable", + [pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), pytest.param(HOURLY_DELTA_TIMETABLE, id="delta")], +) +def test_catchup_next_info_starts_at_previous_interval_end(timetable: Timetable) -> None: + """If ``catchup=True``, the next interval starts at the previous's end.""" + next_info = timetable.next_dagrun_info( + last_automated_data_interval=PREV_DATA_INTERVAL, + restriction=TimeRestriction(earliest=START_DATE, latest=None, catchup=True), + ) + expected_end = PREV_DATA_INTERVAL_END + datetime.timedelta(hours=1) + assert next_info == DagRunInfo.interval(start=PREV_DATA_INTERVAL_END, end=expected_end)
