robinedwards commented on a change in pull request #19145:
URL: https://github.com/apache/airflow/pull/19145#discussion_r734306022
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -3581,3 +3581,51 @@ def test_should_mark_dummy_task_as_success(self):
assert start_date is None
assert end_date is None
assert duration is None
+
+ def test_catchup_works_correctly(self, dag_maker):
+ """Test that catchup works correctly"""
+ session = settings.Session()
+ with dag_maker(
+ dag_id='test_catchup_schedule_dag',
+ schedule_interval=timedelta(days=1),
+ start_date=DEFAULT_DATE,
+ catchup=True,
+ max_active_runs=1,
+ session=session,
+ ) as dag:
+ DummyOperator(task_id='dummy')
+
+ self.scheduler_job = SchedulerJob(subdir=os.devnull)
+ self.scheduler_job.executor = MockExecutor()
+ self.scheduler_job.processor_agent =
mock.MagicMock(spec=DagFileProcessorAgent)
+
+ self.scheduler_job._create_dag_runs([dag_maker.dag_model], session)
+ self.scheduler_job._start_queued_dagruns(session)
+ # first dagrun execution date is DEFAULT_DATE 2016-01-01T00:00:00+00:00
+ dr = DagRun.find(execution_date=DEFAULT_DATE, session=session)[0]
+ ti = dr.get_task_instance(task_id='dummy')
+ ti.state = State.SUCCESS
+ session.merge(ti)
+ session.flush()
+
+ self.scheduler_job._schedule_dag_run(dr, session)
+ session.flush()
+
+ # Run the second time so _update_dag_next_dagrun will run
+ self.scheduler_job._schedule_dag_run(dr, session)
+ session.flush()
+
+ dag.catchup = False
Review comment:
Perhaps worth a comment here as to why we are doing this. Or you could
freezetime to queue and schedule dagruns at the dag start date then outside of
the freezetime block queue another dagrun and then confirm its the latest
execution date?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]