soltanianalytics commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-757374736


   > This needs unit tests adding to prevent future regressions
   
   I tried writing a test for this but didn't get too far. Could you hint me at 
how and what do to?
   
   I thought about something along those lines, but I'm pretty sure I'm missing 
some core concepts and in any case, it's not finished:
   
   ```Python
       def test_do_schedule_cleared_tasks_with_max_active_runs(self):
           """
           Test that the scheduler schedules tasks in DagRun in the order of 
execution
           date if tasks in more than max_active_runs DagRuns were cleared.
           """
           schedule_interval = timedelta(hours=1)
           task_id = "dummy1"
   
           job = SchedulerJob(subdir=os.devnull)
           job.executor = MockExecutor(do_update=False)
           job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
   
           with DAG(
               dag_id='test_schedule_cleared_tasks_with_max_active_runs',
               start_date=DEFAULT_DATE,
               end_date=DEFAULT_DATE + 3 * schedule_interval,
               schedule_interval=schedule_interval,
               max_active_runs=1,
           ) as dag:
               # Cant use DummyOperator as that goes straight to success
               task1 = BashOperator(task_id=task_id, bash_command='true')
   
           session = settings.Session()
           dagbag = DagBag(
               dag_folder=os.devnull,
               include_examples=False,
               read_dags_from_db=True,
           )
           dagbag.bag_dag(dag=dag, root_dag=dag)
           dagbag.sync_to_db(session=session)
   
           # Create three DagRuns with different execution_dates
           dag_runs = [
               dag.create_dagrun(
                   run_type=DagRunType.SCHEDULED,
                   execution_date=DEFAULT_DATE + i * schedule_interval,
                   state=State.RUNNING,
                   session=session,
               ) for i in range(3)
           ]
           dagbag.sync_to_db(session=session)
   
           # queue the task for the first DagRun, and no other
           assert job._do_scheduling(session) == 1
           assert dag_runs[0].get_task_instance(task_id).state == State.QUEUED
           assert dag_runs[1].get_task_instance(task_id).state == State.NONE
           assert dag_runs[2].get_task_instance(task_id).state == State.NONE
   
           # Do something so that the first two DagRuns succeed and the third 
is running
           # ???
   
   
           # Clear the first two DagRuns & their tasks somehow
           # ???
   
           # Test that job._do_scheduling(session) only queue the first TI
           assert job._do_scheduling(session) == 1
           assert dag_runs[0].get_task_instance(task_id).state == State.QUEUED
           assert dag_runs[1].get_task_instance(task_id).state == State.NONE
           assert dag_runs[2].get_task_instance(task_id).state == State.RUNNING
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to