soltanianalytics commented on pull request #13433:
URL: https://github.com/apache/airflow/pull/13433#issuecomment-757374736
> This needs unit tests adding to prevent future regressions
I tried writing a test for this but didn't get too far. Could you hint me at
how and what do to?
I thought about something along those lines, but I'm pretty sure I'm missing
some core concepts and in any case, it's not finished:
```Python
def test_do_schedule_cleared_tasks_with_max_active_runs(self):
"""
Test that the scheduler schedules tasks in DagRun in the order of
execution
date if tasks in more than max_active_runs DagRuns were cleared.
"""
schedule_interval = timedelta(hours=1)
task_id = "dummy1"
job = SchedulerJob(subdir=os.devnull)
job.executor = MockExecutor(do_update=False)
job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
with DAG(
dag_id='test_schedule_cleared_tasks_with_max_active_runs',
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + 3 * schedule_interval,
schedule_interval=schedule_interval,
max_active_runs=1,
) as dag:
# Cant use DummyOperator as that goes straight to success
task1 = BashOperator(task_id=task_id, bash_command='true')
session = settings.Session()
dagbag = DagBag(
dag_folder=os.devnull,
include_examples=False,
read_dags_from_db=True,
)
dagbag.bag_dag(dag=dag, root_dag=dag)
dagbag.sync_to_db(session=session)
# Create three DagRuns with different execution_dates
dag_runs = [
dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + i * schedule_interval,
state=State.RUNNING,
session=session,
) for i in range(3)
]
dagbag.sync_to_db(session=session)
# queue the task for the first DagRun, and no other
assert job._do_scheduling(session) == 1
assert dag_runs[0].get_task_instance(task_id).state == State.QUEUED
assert dag_runs[1].get_task_instance(task_id).state == State.NONE
assert dag_runs[2].get_task_instance(task_id).state == State.NONE
# Do something so that the first two DagRuns succeed and the third
is running
# ???
# Clear the first two DagRuns & their tasks somehow
# ???
# Test that job._do_scheduling(session) only queue the first TI
assert job._do_scheduling(session) == 1
assert dag_runs[0].get_task_instance(task_id).state == State.QUEUED
assert dag_runs[1].get_task_instance(task_id).state == State.NONE
assert dag_runs[2].get_task_instance(task_id).state == State.RUNNING
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]