ashb commented on a change in pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#discussion_r583633158



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2645,6 +2645,79 @@ def 
test_scheduler_verify_pool_full_2_slots_per_task(self):
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
 
+    def test_scheduler_keeps_scheduling_when_a_pool_is_full(self):
+        """
+        Test task instances in a pool that isn't full keep getting scheduled 
even when a pool is full.
+        """
+        dag_d1 = 
DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d1', 
start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t1',
+            dag=dag_d1,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1',
+            bash_command='echo hi',
+        )
+
+        dag_d2 = 
DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d2', 
start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t2',
+            dag=dag_d2,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2',
+            bash_command='echo hi',
+        )
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+        dagbag.sync_to_db()
+
+        session = settings.Session()
+        pool_p1 = 
Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1', slots=1)
+        pool_p2 = 
Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2', slots=10)
+        session.add(pool_p1)
+        session.add(pool_p2)
+        session.commit()
+
+        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        # Create 5 dagruns for each DAG.
+        # To increase the chances the TIs from the "full" pool will get 
retrieved first, we schedule all
+        # TIs from the first dag first.
+        date = DEFAULT_DATE
+        for _ in range(5):
+            dr = dag_d1.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=date,
+                state=State.RUNNING,
+            )
+            scheduler._schedule_dag_run(dr, {}, session)
+            date = dag_d1.following_schedule(date)
+
+        date = DEFAULT_DATE
+        for _ in range(5):
+            dr = dag_d2.create_dagrun(
+                run_type=DagRunType.SCHEDULED,
+                execution_date=date,
+                state=State.RUNNING,
+            )
+            scheduler._schedule_dag_run(dr, {}, session)
+            date = dag_d2.following_schedule(date)
+
+        scheduler._executable_task_instances_to_queued(max_tis=2, 
session=session)
+        task_instances_list2 = 
scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
+
+        # Make sure we get TIs from a non-full pool in the 2nd list
+        assert len(task_instances_list2) > 0
+        assert all(task_instance.pool != 
'test_scheduler_keeps_scheduling_when_a_pool_is_full_p1'

Review comment:
       Black is going to want to reformat this I think




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to