ashb commented on a change in pull request #14476:
URL: https://github.com/apache/airflow/pull/14476#discussion_r583614358



##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -2645,6 +2645,79 @@ def 
test_scheduler_verify_pool_full_2_slots_per_task(self):
         # As tasks require 2 slots, only 3 can fit into 6 available
         assert len(task_instances_list) == 3
 
+    def test_scheduler_keeps_scheduling_when_a_pool_is_full(self):
+        """
+        Test task instances in a pool that isn't full keep getting scheduled 
even when a pool is full.
+        """
+        dag_d1 = 
DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d1', 
start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t1',
+            dag=dag_d1,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1',
+            bash_command='echo hi',
+        )
+
+        dag_d2 = 
DAG(dag_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_d2', 
start_date=DEFAULT_DATE)
+        BashOperator(
+            task_id='test_scheduler_keeps_scheduling_when_a_pool_is_full_t2',
+            dag=dag_d2,
+            owner='airflow',
+            pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2',
+            bash_command='echo hi',
+        )
+        dagbag = DagBag(
+            dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
+            include_examples=False,
+            read_dags_from_db=True,
+        )
+        dagbag.bag_dag(dag=dag_d1, root_dag=dag_d1)
+        dagbag.bag_dag(dag=dag_d2, root_dag=dag_d2)
+        dagbag.sync_to_db()
+
+        session = settings.Session()
+        pool_p1 = 
Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p1', slots=1)
+        pool_p2 = 
Pool(pool='test_scheduler_keeps_scheduling_when_a_pool_is_full_p2', slots=10)
+        session.add(pool_p1)
+        session.add(pool_p2)
+        session.commit()
+
+        dag_d1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag_d1))
+
+        scheduler = SchedulerJob(executor=self.null_exec)
+        scheduler.processor_agent = mock.MagicMock()
+
+        # Create 5 dagruns for each DAG.
+        # To increase the chances the TIs from the "full" pool will get 
retrieved first, we schedule all TIs from the

Review comment:
       Chances? Is this test going to fail some of the time?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to