ephraimbuddy commented on pull request #19130:
URL: https://github.com/apache/airflow/pull/19130#issuecomment-949033748


   > > My patch did not work for a case where the dag was paused and the file 
was edited to set the catchup to False
   > 
   > Just to clarify the paused / editing step in my example was just to get 
the DAG into a state where it is behind schedule. In `main` and `2.2.0` any 
catchup=False dag that's behind schedule will execute all dagruns between its 
current execution date and the latest run.
   
   You are right. I have just learned what catchup really means. My PR only 
made sure no runs are created in advance. Thanks for your contribution.
   Please add this test to the test_scheduler_job.py:
   ```python
      def test_catchup_works_correctly(self, dag_maker):
           """Test that catchup works correctly"""
           with dag_maker(
               dag_id='test_catchup_schedule_dag',
               schedule_interval=timedelta(days=1),
               start_date=DEFAULT_DATE,
               catchup=True,
               max_active_runs=1,
           ) as dag:
               DummyOperator(
                   task_id='dummy',
               )
           
           session = settings.Session()
           self.scheduler_job = SchedulerJob(subdir=os.devnull)
           self.scheduler_job.executor = MockExecutor()
           self.scheduler_job.processor_agent = 
mock.MagicMock(spec=DagFileProcessorAgent)
   
           self.scheduler_job._create_dag_runs([dag_maker.dag_model], session)
           self.scheduler_job._start_queued_dagruns(session)
           # first dagrun execution date is DEFAULT_DATE 
2016-01-01T00:00:00+00:00
           drs = DagRun.find(execution_date=DEFAULT_DATE)
           ti = drs[0].get_task_instance(task_id='dummy')
           ti.state = State.SUCCESS
           session.merge(ti)
           session.commit()
           self.scheduler_job._schedule_dag_run(drs[0], session)
           session.flush()
           # Run the second time so _update_dag_next_dagrun will run
           self.scheduler_job._schedule_dag_run(drs[0], session)
           session.flush()
           dag.catchup = False
           dag.sync_to_db()
           assert dag.catchup is False
           dm = DagModel.get_dagmodel(dag.dag_id)
           self.scheduler_job._create_dag_runs([dm], session)
           #exclude the first run
           dr = 
session.query(DagRun).filter(DagRun.execution_date!=DEFAULT_DATE).one()
           # Check catchup worked correctly by ensuring execution_date is quite 
new
           # Our dag is a daily dag
           assert dr.execution_date > (timezone.utcnow()-timedelta(days=2))
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to