This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit af22a533938ec7054528ece8a7469da4303946ee Author: Tanel Kiis <[email protected]> AuthorDate: Mon May 9 18:12:40 2022 +0300 Pools with negative open slots should not block other pools (#23143) (cherry picked from commit 7132be2f11db24161940f57613874b4af86369c7) --- airflow/jobs/scheduler_job.py | 2 +- airflow/models/pool.py | 6 +----- tests/jobs/test_scheduler_job.py | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 6f366b3987..b4f2714e3a 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -276,7 +276,7 @@ class SchedulerJob(BaseJob): # If the pools are full, there is no point doing anything! # If _somehow_ the pool is overfull, don't let the limit go negative - it breaks SQL - pool_slots_free = max(0, sum(pool['open'] for pool in pools.values())) + pool_slots_free = sum(max(0, pool['open']) for pool in pools.values()) if pool_slots_free == 0: self.log.debug("All pools are full!") diff --git a/airflow/models/pool.py b/airflow/models/pool.py index 3a65e6c188..f195b93d42 100644 --- a/airflow/models/pool.py +++ b/airflow/models/pool.py @@ -191,11 +191,7 @@ class Pool(Base): # calculate open metric for pool_name, stats_dict in pools.items(): - if stats_dict["total"] == -1: - # -1 means infinite - stats_dict["open"] = -1 - else: - stats_dict["open"] = stats_dict["total"] - stats_dict["running"] - stats_dict["queued"] + stats_dict["open"] = stats_dict["total"] - stats_dict["running"] - stats_dict["queued"] return pools diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 4a4b89e619..a4fe6a0b3e 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1135,6 +1135,42 @@ class TestSchedulerJob: session.rollback() + def test_find_executable_task_instances_negative_open_pool_slots(self, dag_maker): + """ + Pools with negative open slots should not block other pools. + Negative open slots can happen when reducing the number of total slots in a pool + while tasks are running in that pool. + """ + set_default_pool_slots(0) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + session = settings.Session() + + pool1 = Pool(pool='pool1', slots=1) + pool2 = Pool(pool='pool2', slots=1) + + session.add(pool1) + session.add(pool2) + + dag_id = 'SchedulerJobTest.test_find_executable_task_instances_negative_open_pool_slots' + with dag_maker(dag_id=dag_id): + op1 = EmptyOperator(task_id='op1', pool='pool1') + op2 = EmptyOperator(task_id='op2', pool='pool2', pool_slots=2) + + dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + + ti1 = dr1.get_task_instance(op1.task_id, session) + ti2 = dr1.get_task_instance(op2.task_id, session) + ti1.state = State.SCHEDULED + ti2.state = State.RUNNING + session.flush() + + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=1, session=session) + assert 1 == len(res) + assert res[0].key == ti1.key + + session.rollback() + @mock.patch('airflow.jobs.scheduler_job.Stats.gauge') def test_emit_pool_starving_tasks_metrics(self, mock_stats_gauge, dag_maker): self.scheduler_job = SchedulerJob(subdir=os.devnull)
