bperson opened a new pull request #14476:
URL: https://github.com/apache/airflow/pull/14476


   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   Currently the scheduler can be stuck in a loop if the query tweaked in this 
PR continually returns a set of task instances that aren't schedule-able 
because all their respective pools are full. This is even more apparent when 
your `max_tis` gets low with the various `min`s in the scheduler main loop code 
path ( either because your overall pool open slots are low, your parallelism is 
low, or you've set `max_tis_per_query` too low ).
   
   The unit test reproduces the issue ( though I'm still not convinced it's the 
best way to show it, I've included further down a "setup", I guess this should 
be thrown into an integration test instead? ):
   
   The test setup idea is to fill the set of schedule-able TIs returned by that 
query full of TIs that will starve a first pool from open slots, and then add 
another set of schedule-able TIs that should be easy to schedule and execute 
because their execution pool is much bigger. Without filtering out TIs from the 
first pool ( that will be further discarded in that code path ) we end up with 
a scheduler stuck until the first pool gets through its work, this starves the 
second pool for no reason other than the scheduler never getting to them:
   
   Using a minimal "real-life" example ( you need to create a `starving_pool` 
with only a couple of slots )
   ```
   [2021-02-26 08:56:43,604] {scheduler_job.py:950} INFO - 2 tasks up for 
execution:
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:20:00+00:00 
[scheduled]>
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:10:00+00:00 
[scheduled]>
   [2021-02-26 08:56:43,606] {scheduler_job.py:979} INFO - Figuring out tasks 
to run in Pool(name=starving_pool) with 0 open slots and 2 task instances ready 
to be queued
   [2021-02-26 08:56:43,606] {scheduler_job.py:994} INFO - Not scheduling since 
there are 0 open slots in pool starving_pool
   [2021-02-26 08:56:43,606] {scheduler_job.py:1072} INFO - Setting the 
following tasks to queued state:
   
   [2021-02-26 08:56:44,757] {scheduler_job.py:950} INFO - 2 tasks up for 
execution:
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:20:00+00:00 
[scheduled]>
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:10:00+00:00 
[scheduled]>
   [2021-02-26 08:56:44,759] {scheduler_job.py:979} INFO - Figuring out tasks 
to run in Pool(name=starving_pool) with 0 open slots and 2 task instances ready 
to be queued
   [2021-02-26 08:56:44,759] {scheduler_job.py:994} INFO - Not scheduling since 
there are 0 open slots in pool starving_pool
   [2021-02-26 08:56:44,759] {scheduler_job.py:1072} INFO - Setting the 
following tasks to queued state:
   
   [2021-02-26 08:56:45,068] {scheduler_job.py:950} INFO - 2 tasks up for 
execution:
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:20:00+00:00 
[scheduled]>
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:10:00+00:00 
[scheduled]>
   [2021-02-26 08:56:45,070] {scheduler_job.py:979} INFO - Figuring out tasks 
to run in Pool(name=starving_pool) with 0 open slots and 2 task instances ready 
to be queued
   [2021-02-26 08:56:45,070] {scheduler_job.py:994} INFO - Not scheduling since 
there are 0 open slots in pool starving_pool
   [2021-02-26 08:56:45,070] {scheduler_job.py:1072} INFO - Setting the 
following tasks to queued state:
   
   [2021-02-26 08:56:45,309] {scheduler_job.py:950} INFO - 2 tasks up for 
execution:
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:20:00+00:00 
[scheduled]>
           <TaskInstance: starving_pool.sleeper 2021-02-26 03:10:00+00:00 
[scheduled]>
   [2021-02-26 08:56:45,311] {scheduler_job.py:979} INFO - Figuring out tasks 
to run in Pool(name=starving_pool) with 0 open slots and 2 task instances ready 
to be queued
   [2021-02-26 08:56:45,311] {scheduler_job.py:994} INFO - Not scheduling since 
there are 0 open slots in pool starving_pool
   ```
   
   `starved_dag.py`
   ```
   """DAG starved even though its TIs are executable in a non starved pool ( 
default pool )"""
   from datetime import datetime, timedelta
   
   from airflow.models.dag import DAG
   from airflow.operators.bash import BashOperator
   from airflow.utils.dates import days_ago
   
   DAG_NAME = 'starved_dag'
   
   default_args = {'owner': 'airflow', 'start_date': days_ago(0), 
'dagrun_timeout': timedelta(minutes=6)}
   dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', 
default_args=default_args)
   
   echoer = BashOperator(
       task_id='echoer',
       bash_command='echo "bonjour"',
       dag=dag,
   )
   
   if __name__ == "__main__":
       dag.cli()
   ```
   
   `starving_pool.py`
   ```
   """DAG to starve schedulable TIs query"""
   from datetime import datetime, timedelta
   
   from airflow.models.dag import DAG
   from airflow.operators.bash import BashOperator
   from airflow.utils.dates import days_ago
   
   DAG_NAME = 'starving_pool'
   
   default_args = {'owner': 'airflow', 'start_date': days_ago(0), 
'dagrun_timeout': timedelta(minutes=6)}
   dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', 
default_args=default_args)
   
   sleeper = BashOperator(
       task_id='sleeper',
       bash_command='sleep 300',
       dag=dag,
       pool='starving_pool'
   )
   
   if __name__ == "__main__":
       dag.cli()
   ```
   
   We hit this issue in our production stack with real life DAGs. To bypass it, 
we're currently running with an insanely big unused pool ( `999999` slots :D ), 
an insanely big `max_tis_per_query`, and an insanely big `parallelism` ( even 
though we don't have the actual workers behind it ) to make sure that the 
number of TIs returned by the tweaked query will always be bigger than the 
maximum number of TIs stuck in starved pools at any point in time.
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to