ephraimbuddy commented on a change in pull request #19860:
URL: https://github.com/apache/airflow/pull/19860#discussion_r759142516
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1858,26 +1844,20 @@ def _create_dagruns():
# As tasks require 2 slots, only 3 can fit into 6 available
assert len(task_instances_list) == 3
- @pytest.mark.quarantined
def test_scheduler_keeps_scheduling_pool_full(self, dag_maker):
"""
Test task instances in a pool that isn't full keep getting scheduled
even when a pool is full.
"""
- with dag_maker(
- dag_id='test_scheduler_keeps_scheduling_pool_full_d1',
- start_date=DEFAULT_DATE,
- ):
+
+ with dag_maker(dag_id='test_scheduler_keeps_scheduling_pool_full_d1',
start_date=DEFAULT_DATE):
Review comment:
Maybe we should use the session fixture here and have everything use the
same session. Not a problem though.
```python
def test_scheduler_keeps_scheduling_pool_full(self, session, dag_maker):
"""
Test task instances in a pool that isn't full keep getting scheduled
even when a pool is full.
"""
with dag_maker(
dag_id='test_scheduler_keeps_scheduling_pool_full_d1',
start_date=DEFAULT_DATE,
session = session
):
BashOperator(
task_id='test_scheduler_keeps_scheduling_pool_full_t1',
pool='test_scheduler_keeps_scheduling_pool_full_p1',
bash_command='echo hi',
)
dag_d1 = dag_maker.dag
with dag_maker(
dag_id='test_scheduler_keeps_scheduling_pool_full_d2',
start_date=DEFAULT_DATE,
session=session
):
BashOperator(
task_id='test_scheduler_keeps_scheduling_pool_full_t2',
pool='test_scheduler_keeps_scheduling_pool_full_p2',
bash_command='echo hi',
)
dag_d2 = dag_maker.dag
pool_p1 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p1',
slots=1)
pool_p2 = Pool(pool='test_scheduler_keeps_scheduling_pool_full_p2',
slots=10)
session.add(pool_p1)
session.add(pool_p2)
session.flush()
scheduler = SchedulerJob(executor=self.null_exec)
scheduler.processor_agent = mock.MagicMock()
def _create_dagruns(dag: DAG):
next_info = dag.next_dagrun_info(None)
for _ in range(5):
yield dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=next_info.logical_date,
data_interval=next_info.data_interval,
state=State.RUNNING,
)
next_info = dag.next_dagrun_info(next_info.data_interval)
# Create 5 dagruns for each DAG.
# To increase the chances the TIs from the "full" pool will get
retrieved first, we schedule all
# TIs from the first dag first.
for dr in _create_dagruns(dag_d1):
scheduler._schedule_dag_run(dr, session)
for dr in _create_dagruns(dag_d2):
scheduler._schedule_dag_run(dr, session)
scheduler._executable_task_instances_to_queued(max_tis=2,
session=session)
task_instances_list2 =
scheduler._executable_task_instances_to_queued(max_tis=2, session=session)
# Make sure we get TIs from a non-full pool in the 2nd list
assert len(task_instances_list2) > 0
assert all(
task_instance.pool !=
'test_scheduler_keeps_scheduling_pool_full_p1'
for task_instance in task_instances_list2
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]