This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c5a941b9b590eaa528e0b41efa62b63027327664 Author: pulsar314 <[email protected]> AuthorDate: Thu Jun 25 22:42:03 2020 +0300 Fixes treatment of open slots in scheduler (#9316) (#9505) Makes scheduler count with number of slots required by tasks. If there's less open slots than required, a task isn't taken to a queue. (cherry picked from commit 0e31f186d38b776710080ba07be50eedf42c48a7) --- airflow/jobs/scheduler_job.py | 29 +++++--- tests/jobs/test_scheduler_job.py | 144 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+), 8 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 97f3929..685b57f 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -969,6 +969,9 @@ class SchedulerJob(BaseJob): dag_concurrency_map, task_concurrency_map = self.__get_concurrency_maps( states=STATES_TO_COUNT_AS_RUNNING, session=session) + num_tasks_in_executor = 0 + num_starving_tasks_total = 0 + # Go through each pool, and queue up a task for execution if there are # any open slots in the pool. for pool, task_instances in pool_to_task_instances.items(): @@ -992,9 +995,7 @@ class SchedulerJob(BaseJob): priority_sorted_task_instances = sorted( task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)) - # Number of tasks that cannot be scheduled because of no open slot in pool num_starving_tasks = 0 - num_tasks_in_executor = 0 for current_index, task_instance in enumerate(priority_sorted_task_instances): if open_slots <= 0: self.log.info( @@ -1002,7 +1003,9 @@ class SchedulerJob(BaseJob): open_slots, pool ) # Can't schedule any more since there are no more open slots. - num_starving_tasks = len(priority_sorted_task_instances) - current_index + num_unhandled = len(priority_sorted_task_instances) - current_index + num_starving_tasks += num_unhandled + num_starving_tasks_total += num_unhandled break # Check to make sure that the task concurrency of the DAG hasn't been @@ -1045,8 +1048,17 @@ class SchedulerJob(BaseJob): num_tasks_in_executor += 1 continue + if task_instance.pool_slots > open_slots: + self.log.info("Not executing %s since it requires %s slots " + "but there are %s open slots in the pool %s.", + task_instance, task_instance.pool_slots, open_slots, pool) + num_starving_tasks += 1 + num_starving_tasks_total += 1 + # Though we can execute tasks with lower priority if there's enough room + continue + executable_tis.append(task_instance) - open_slots -= 1 + open_slots -= task_instance.pool_slots dag_concurrency_map[dag_id] += 1 task_concurrency_map[(task_instance.dag_id, task_instance.task_id)] += 1 @@ -1056,10 +1068,11 @@ class SchedulerJob(BaseJob): pools[pool_name].open_slots()) Stats.gauge('pool.used_slots.{pool_name}'.format(pool_name=pool_name), pools[pool_name].occupied_slots()) - Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine)) - Stats.gauge('scheduler.tasks.running', num_tasks_in_executor) - Stats.gauge('scheduler.tasks.starving', num_starving_tasks) - Stats.gauge('scheduler.tasks.executable', len(executable_tis)) + + Stats.gauge('scheduler.tasks.pending', len(task_instances_to_examine)) + Stats.gauge('scheduler.tasks.running', num_tasks_in_executor) + Stats.gauge('scheduler.tasks.starving', num_starving_tasks_total) + Stats.gauge('scheduler.tasks.executable', len(executable_tis)) task_instance_str = "\n\t".join( [repr(x) for x in executable_tis]) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 161e479..2188d8b 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -48,6 +48,7 @@ from airflow.models import DAG, DagBag, DagModel, DagRun, Pool, SlaMiss, \ TaskInstance as TI, errors from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator +from airflow.serialization.serialized_objects import SerializedDAG from airflow.utils import timezone from airflow.utils.dag_processing import SimpleDag, SimpleDagBag, list_py_file_paths from airflow.utils.dates import days_ago @@ -1901,6 +1902,149 @@ class SchedulerJobTest(unittest.TestCase): self.assertIsNotNone(dr) self.assertEqual(dr.execution_date, timezone.datetime(2016, 1, 1, 10, 10)) + def test_scheduler_verify_pool_full_2_slots_per_task(self): + """ + Test task instances not queued when pool is full. + + Variation with non-default pool_slots + """ + dag = DAG( + dag_id='test_scheduler_verify_pool_full_2_slots_per_task', + start_date=DEFAULT_DATE) + + DummyOperator( + task_id='dummy', + dag=dag, + owner='airflow', + pool='test_scheduler_verify_pool_full_2_slots_per_task', + pool_slots=2, + ) + + session = settings.Session() + pool = Pool(pool='test_scheduler_verify_pool_full_2_slots_per_task', slots=6) + session.add(pool) + orm_dag = DagModel(dag_id=dag.dag_id) + orm_dag.is_paused = False + session.merge(orm_dag) + session.commit() + + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + + scheduler = SchedulerJob(executor=self.null_exec) + + # Create 5 dagruns, which will create 5 task instances. + for _ in range(5): + scheduler.create_dag_run(dag) + task_instances_list = [] + scheduler._process_task_instances(dag, task_instances_list=task_instances_list) + self.assertEqual(len(task_instances_list), 5) + dagbag = self._make_simple_dag_bag([dag]) + + # Recreated part of the scheduler here, to kick off tasks -> executor + for ti_key in task_instances_list: + task = dag.get_task(ti_key[1]) + ti = TaskInstance(task, ti_key[2]) + # Task starts out in the scheduled state. All tasks in the + # scheduled state will be sent to the executor + ti.state = State.SCHEDULED + + # Also save this task instance to the DB. + session.merge(ti) + session.commit() + + self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition") + scheduler._execute_task_instances(dagbag, (State.SCHEDULED,), session=session) + + # As tasks require 2 slots, only 3 can fit into 6 available + self.assertEqual(len(scheduler.executor.queued_tasks), 3) + + def test_scheduler_verify_priority_and_slots(self): + """ + Test task instances with higher priority are not queued + when pool does not have enough slots. + + Though tasks with lower priority might be executed. + """ + dag = DAG( + dag_id='test_scheduler_verify_priority_and_slots', + start_date=DEFAULT_DATE) + + # Medium priority, not enough slots + DummyOperator( + task_id='test_scheduler_verify_priority_and_slots_t0', + dag=dag, + owner='airflow', + pool='test_scheduler_verify_priority_and_slots', + pool_slots=2, + priority_weight=2, + ) + # High priority, occupies first slot + DummyOperator( + task_id='test_scheduler_verify_priority_and_slots_t1', + dag=dag, + owner='airflow', + pool='test_scheduler_verify_priority_and_slots', + pool_slots=1, + priority_weight=3, + ) + # Low priority, occupies second slot + DummyOperator( + task_id='test_scheduler_verify_priority_and_slots_t2', + dag=dag, + owner='airflow', + pool='test_scheduler_verify_priority_and_slots', + pool_slots=1, + priority_weight=1, + ) + + session = settings.Session() + pool = Pool(pool='test_scheduler_verify_priority_and_slots', slots=2) + session.add(pool) + orm_dag = DagModel(dag_id=dag.dag_id) + orm_dag.is_paused = False + session.merge(orm_dag) + session.commit() + + dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) + + scheduler = SchedulerJob(executor=self.null_exec) + + scheduler.create_dag_run(dag) + task_instances_list = [] + scheduler._process_task_instances(dag, task_instances_list=task_instances_list) + self.assertEqual(len(task_instances_list), 3) + dagbag = self._make_simple_dag_bag([dag]) + + # Recreated part of the scheduler here, to kick off tasks -> executor + for ti_key in task_instances_list: + task = dag.get_task(ti_key[1]) + ti = TaskInstance(task, ti_key[2]) + # Task starts out in the scheduled state. All tasks in the + # scheduled state will be sent to the executor + ti.state = State.SCHEDULED + + # Also save this task instance to the DB. + session.merge(ti) + session.commit() + + self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition") + scheduler._execute_task_instances(dagbag, (State.SCHEDULED, ), session=session) + + # Only second and third + self.assertEqual(len(scheduler.executor.queued_tasks), 2) + + ti0 = session.query(TaskInstance)\ + .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t0').first() + self.assertEqual(ti0.state, State.SCHEDULED) + + ti1 = session.query(TaskInstance)\ + .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t1').first() + self.assertEqual(ti1.state, State.QUEUED) + + ti2 = session.query(TaskInstance)\ + .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t2').first() + self.assertEqual(ti2.state, State.QUEUED) + def test_scheduler_reschedule(self): """ Checks if tasks that are not taken up by the executor
