bperson opened a new issue #14515:
URL: https://github.com/apache/airflow/issues/14515


   **Apache Airflow version**: v2.0.0 and up
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): not tested with K8
   **Environment**:
   all
   
   **What happened**:
   
   Executing the unit test included below, or create an infinite pool ( `-1` 
slots ) and tasks that should be executed in that pool.
   ```
   INFO     airflow.jobs.scheduler_job.SchedulerJob:scheduler_job.py:991 Not 
scheduling since there are -1 open slots in pool 
test_scheduler_verify_infinite_pool
   ```
   
   **What you expected to happen**:
   
   To schedule tasks, or to drop support for infinite slots pools?
   
   **How to reproduce it**:
   easiest one is this unit test:
   ```
   def test_scheduler_verify_infinite_pool(self):
       """
       Test that TIs are still scheduled if we only have one infinite pool.
       """
       dag = DAG(dag_id='test_scheduler_verify_infinite_pool', 
start_date=DEFAULT_DATE)
       BashOperator(
           task_id='test_scheduler_verify_infinite_pool_t0',
           dag=dag,
           owner='airflow',
           pool='test_scheduler_verify_infinite_pool',
           bash_command='echo hi',
       )
   
       dagbag = DagBag(
           dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"),
           include_examples=False,
           read_dags_from_db=True,
       )
       dagbag.bag_dag(dag=dag, root_dag=dag)
       dagbag.sync_to_db()
   
       session = settings.Session()
       pool = Pool(pool='test_scheduler_verify_infinite_pool', slots=-1)
       session.add(pool)
       session.commit()
   
       dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
   
       scheduler = SchedulerJob(executor=self.null_exec)
       scheduler.processor_agent = mock.MagicMock()
   
       dr = dag.create_dagrun(
           run_type=DagRunType.SCHEDULED,
           execution_date=DEFAULT_DATE,
           state=State.RUNNING,
       )
       scheduler._schedule_dag_run(dr, {}, session)
   
       task_instances_list = 
scheduler._executable_task_instances_to_queued(max_tis=32, session=session)
   
       # Let's make sure we don't end up with a `max_tis` == 0
       assert len(task_instances_list) >= 1
   ```
   
   **Anything else we need to know**:
   
   Overall I'm not sure whether it's worth fixing in those various spots:
   
https://github.com/bperson/airflow/blob/master/airflow/jobs/scheduler_job.py#L908
   
https://github.com/bperson/airflow/blob/master/airflow/jobs/scheduler_job.py#L971
   
https://github.com/bperson/airflow/blob/master/airflow/jobs/scheduler_job.py#L988
   
https://github.com/bperson/airflow/blob/master/airflow/jobs/scheduler_job.py#L1041
   
https://github.com/bperson/airflow/blob/master/airflow/jobs/scheduler_job.py#L1056
   
   Or whether to restrict `-1` ( infinite ) slots in pools:
   https://github.com/bperson/airflow/blob/master/airflow/models/pool.py#L49


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to