This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit af22a533938ec7054528ece8a7469da4303946ee
Author: Tanel Kiis <[email protected]>
AuthorDate: Mon May 9 18:12:40 2022 +0300

    Pools with negative open slots should not block other pools (#23143)
    
    (cherry picked from commit 7132be2f11db24161940f57613874b4af86369c7)
---
 airflow/jobs/scheduler_job.py    |  2 +-
 airflow/models/pool.py           |  6 +-----
 tests/jobs/test_scheduler_job.py | 36 ++++++++++++++++++++++++++++++++++++
 3 files changed, 38 insertions(+), 6 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 6f366b3987..b4f2714e3a 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -276,7 +276,7 @@ class SchedulerJob(BaseJob):
 
         # If the pools are full, there is no point doing anything!
         # If _somehow_ the pool is overfull, don't let the limit go negative - 
it breaks SQL
-        pool_slots_free = max(0, sum(pool['open'] for pool in pools.values()))
+        pool_slots_free = sum(max(0, pool['open']) for pool in pools.values())
 
         if pool_slots_free == 0:
             self.log.debug("All pools are full!")
diff --git a/airflow/models/pool.py b/airflow/models/pool.py
index 3a65e6c188..f195b93d42 100644
--- a/airflow/models/pool.py
+++ b/airflow/models/pool.py
@@ -191,11 +191,7 @@ class Pool(Base):
 
         # calculate open metric
         for pool_name, stats_dict in pools.items():
-            if stats_dict["total"] == -1:
-                # -1 means infinite
-                stats_dict["open"] = -1
-            else:
-                stats_dict["open"] = stats_dict["total"] - 
stats_dict["running"] - stats_dict["queued"]
+            stats_dict["open"] = stats_dict["total"] - stats_dict["running"] - 
stats_dict["queued"]
 
         return pools
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 4a4b89e619..a4fe6a0b3e 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1135,6 +1135,42 @@ class TestSchedulerJob:
 
         session.rollback()
 
+    def test_find_executable_task_instances_negative_open_pool_slots(self, 
dag_maker):
+        """
+        Pools with negative open slots should not block other pools.
+        Negative open slots can happen when reducing the number of total slots 
in a pool
+        while tasks are running in that pool.
+        """
+        set_default_pool_slots(0)
+
+        self.scheduler_job = SchedulerJob(subdir=os.devnull)
+        session = settings.Session()
+
+        pool1 = Pool(pool='pool1', slots=1)
+        pool2 = Pool(pool='pool2', slots=1)
+
+        session.add(pool1)
+        session.add(pool2)
+
+        dag_id = 
'SchedulerJobTest.test_find_executable_task_instances_negative_open_pool_slots'
+        with dag_maker(dag_id=dag_id):
+            op1 = EmptyOperator(task_id='op1', pool='pool1')
+            op2 = EmptyOperator(task_id='op2', pool='pool2', pool_slots=2)
+
+        dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+
+        ti1 = dr1.get_task_instance(op1.task_id, session)
+        ti2 = dr1.get_task_instance(op2.task_id, session)
+        ti1.state = State.SCHEDULED
+        ti2.state = State.RUNNING
+        session.flush()
+
+        res = 
self.scheduler_job._executable_task_instances_to_queued(max_tis=1, 
session=session)
+        assert 1 == len(res)
+        assert res[0].key == ti1.key
+
+        session.rollback()
+
     @mock.patch('airflow.jobs.scheduler_job.Stats.gauge')
     def test_emit_pool_starving_tasks_metrics(self, mock_stats_gauge, 
dag_maker):
         self.scheduler_job = SchedulerJob(subdir=os.devnull)

Reply via email to