This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch v2-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 04a5de3bb1f07341c3d099daca3ef7fb2a4a5502
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Fri Oct 22 15:38:57 2021 +0800

    Add test for interval timetable catchup=False (#19145)
    
    (cherry picked from commit 205219c522b77abe9d36d51807c32189c900cfd4)
---
 airflow/timetables/interval.py              | 24 +++++-----
 tests/jobs/test_scheduler_job.py            | 48 +++++++++++++++++++
 tests/timetables/test_interval_timetable.py | 74 +++++++++++++++++++++++++++++
 3 files changed, 134 insertions(+), 12 deletions(-)

diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py
index 0dd7ffd..a095565 100644
--- a/airflow/timetables/interval.py
+++ b/airflow/timetables/interval.py
@@ -74,22 +74,22 @@ class _DataIntervalTimetable(Timetable):
         earliest = restriction.earliest
         if not restriction.catchup:
             earliest = self._skip_to_latest(earliest)
+        elif earliest is not None:
+            earliest = self._align(earliest)
         if last_automated_data_interval is None:
             # First run; schedule the run at the first available time matching
             # the schedule, and retrospectively create a data interval for it.
             if earliest is None:
                 return None
-            start = self._align(earliest)
-        else:
-            # There's a previous run.
+            start = earliest
+        else:  # There's a previous run.
             if earliest is not None:
                 # Catchup is False or DAG has new start date in the future.
-                # Make sure we get the latest start date
+                # Make sure we get the later one.
                 start = max(last_automated_data_interval.end, earliest)
             else:
-                # Create a data interval starting from when the end of the 
previous interval.
+                # Data interval starts from the end of the previous interval.
                 start = last_automated_data_interval.end
-
         if restriction.latest is not None and start > restriction.latest:
             return None
         end = self._get_next(start)
@@ -189,8 +189,8 @@ class CronDataIntervalTimetable(_DataIntervalTimetable):
     def _align(self, current: DateTime) -> DateTime:
         """Get the next scheduled time.
 
-        This is ``current + interval``, unless ``current`` is first interval,
-        then ``current`` is returned.
+        This is ``current + interval``, unless ``current`` falls right on the
+        interval boundary, when ``current`` is returned.
         """
         next_time = self._get_next(current)
         if self._get_prev(next_time) != current:
@@ -205,14 +205,14 @@ class CronDataIntervalTimetable(_DataIntervalTimetable):
 
         This is slightly different from the delta version at terminal values.
         If the next schedule should start *right now*, we want the data 
interval
-        that start right now now, not the one that ends now.
+        that start now, not the one that ends now.
         """
         current_time = DateTime.utcnow()
-        next_start = self._get_next(current_time)
         last_start = self._get_prev(current_time)
-        if next_start == current_time:
+        next_start = self._get_next(last_start)
+        if next_start == current_time:  # Current time is on interval boundary.
             new_start = last_start
-        elif next_start > current_time:
+        elif next_start > current_time:  # Current time is between boundaries.
             new_start = self._get_prev(last_start)
         else:
             raise AssertionError("next schedule shouldn't be earlier")
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 16b2921..4922617 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3578,3 +3578,51 @@ class TestSchedulerJobQueriesCount:
                 assert start_date is None
                 assert end_date is None
                 assert duration is None
+
+    def test_catchup_works_correctly(self, dag_maker):
+        """Test that catchup works correctly"""
+        session = settings.Session()
+        with dag_maker(
+            dag_id='test_catchup_schedule_dag',
+            schedule_interval=timedelta(days=1),
+            start_date=DEFAULT_DATE,
+            catchup=True,
+            max_active_runs=1,
+            session=session,
+        ) as dag:
+            DummyOperator(task_id='dummy')
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        self.scheduler_job.executor = MockExecutor()
+        self.scheduler_job.processor_agent = 
mock.MagicMock(spec=DagFileProcessorAgent)
+
+        self.scheduler_job._create_dag_runs([dag_maker.dag_model], session)
+        self.scheduler_job._start_queued_dagruns(session)
+        # first dagrun execution date is DEFAULT_DATE 2016-01-01T00:00:00+00:00
+        dr = DagRun.find(execution_date=DEFAULT_DATE, session=session)[0]
+        ti = dr.get_task_instance(task_id='dummy')
+        ti.state = State.SUCCESS
+        session.merge(ti)
+        session.flush()
+
+        self.scheduler_job._schedule_dag_run(dr, session)
+        session.flush()
+
+        # Run the second time so _update_dag_next_dagrun will run
+        self.scheduler_job._schedule_dag_run(dr, session)
+        session.flush()
+
+        dag.catchup = False
+        dag.sync_to_db()
+        assert not dag.catchup
+
+        dm = DagModel.get_dagmodel(dag.dag_id)
+        self.scheduler_job._create_dag_runs([dm], session)
+
+        # Check catchup worked correctly by ensuring execution_date is quite 
new
+        # Our dag is a daily dag
+        assert (
+            session.query(DagRun.execution_date)
+            .filter(DagRun.execution_date != DEFAULT_DATE)  # exclude the 
first run
+            .scalar()
+        ) > (timezone.utcnow() - timedelta(days=2))
diff --git a/tests/timetables/test_interval_timetable.py 
b/tests/timetables/test_interval_timetable.py
new file mode 100644
index 0000000..53f5aeb
--- /dev/null
+++ b/tests/timetables/test_interval_timetable.py
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import datetime
+from typing import Optional
+
+import freezegun
+import pendulum
+import pytest
+
+from airflow.settings import TIMEZONE
+from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, 
Timetable
+from airflow.timetables.interval import CronDataIntervalTimetable, 
DeltaDataIntervalTimetable
+
+START_DATE = pendulum.DateTime(2021, 9, 4, tzinfo=TIMEZONE)
+
+PREV_DATA_INTERVAL_START = START_DATE
+PREV_DATA_INTERVAL_END = START_DATE + datetime.timedelta(days=1)
+PREV_DATA_INTERVAL = DataInterval(start=PREV_DATA_INTERVAL_START, 
end=PREV_DATA_INTERVAL_END)
+
+CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE)
+
+HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE)
+HOURLY_DELTA_TIMETABLE = 
DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
+
+
[email protected](
+    "timetable",
+    [pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), 
pytest.param(HOURLY_DELTA_TIMETABLE, id="delta")],
+)
[email protected](
+    "last_automated_data_interval",
+    [pytest.param(None, id="first-run"), pytest.param(PREV_DATA_INTERVAL, 
id="subsequent")],
+)
[email protected]_time(CURRENT_TIME)
+def test_no_catchup_next_info_starts_at_current_time(
+    timetable: Timetable,
+    last_automated_data_interval: Optional[DataInterval],
+) -> None:
+    """If ``catchup=False``, the next data interval ends at the current 
time."""
+    next_info = timetable.next_dagrun_info(
+        last_automated_data_interval=last_automated_data_interval,
+        restriction=TimeRestriction(earliest=START_DATE, latest=None, 
catchup=False),
+    )
+    expected_start = CURRENT_TIME - datetime.timedelta(hours=1)
+    assert next_info == DagRunInfo.interval(start=expected_start, 
end=CURRENT_TIME)
+
+
[email protected](
+    "timetable",
+    [pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), 
pytest.param(HOURLY_DELTA_TIMETABLE, id="delta")],
+)
+def test_catchup_next_info_starts_at_previous_interval_end(timetable: 
Timetable) -> None:
+    """If ``catchup=True``, the next interval starts at the previous's end."""
+    next_info = timetable.next_dagrun_info(
+        last_automated_data_interval=PREV_DATA_INTERVAL,
+        restriction=TimeRestriction(earliest=START_DATE, latest=None, 
catchup=True),
+    )
+    expected_end = PREV_DATA_INTERVAL_END + datetime.timedelta(hours=1)
+    assert next_info == DagRunInfo.interval(start=PREV_DATA_INTERVAL_END, 
end=expected_end)

Reply via email to