vapiravfif opened a new issue #19622:
URL: https://github.com/apache/airflow/issues/19622


   ### Apache Airflow version
   
   2.0.2
   
   ### Operating System
   
   amzn linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   AIRFLOW__CORE__PARALLELISM: "128"
   worker_concurrency: 32
   2 Celery workers
   MySQL 8.0.23 RDS as a DB backend
   
   ### What happened
   
   * Tasks get stuck in "scheduled" state for hours, in task details it says 
that "All dependencies are met but the task instance is not running"
   * The stuck tasks are executed eventually
   * Usually, at the same time, there're DAGs with >100 tasks are running
   * The big dags are limited by dag-level `concurrency` parameter to 10 tasks 
at a time
   * Workers and pools have plenty of free slots
   * If big dags are switched off - starving tasks are picked up immediately, 
even if tasks from the big dags are still running
   * In scheduler logs, the starving task do not appear in the "tasks up for 
execution" list
   * Number of concurrent tasks that are actually running is around 30 total on 
both executors (out of 64 available slots)
   
   ### What you expected to happen
   
   As there are enough slots on the workers & pools, I expect tasks that are 
ready and actually *can* run to be picked up and moved to queued by scheduler
   
   ### How to reproduce
   
   This example dag should reproduce the problem on environment with at least 
20-25 available slots and core parallelism of 128. The dag that will get 
starved is the "tester_multi_load_3". Not each task, but on my env there were 
holes of up to 20 minutes between tasks execution. Guess the starvation time 
depends on ordering (?), as I'm not adding any weights...
   
   <details><summary>CLICK ME</summary>
   <p>
   
   
   ```python
   import os
   import time
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   
   
   default_args = {
       'owner': 'Tester',
       'depends_on_past': False,
       'start_date': datetime(2021, 7, 17),
       'retries': 5
   }
   
   
   def sleep(timer):
       if not timer:
           timer = 60
       print(f'Timer is {str(timer)}')
       time.sleep(timer)
   
   
   with DAG(
           dag_id=os.path.basename(__file__).replace('.py', '') + '_1',  # name 
of the dag
           default_args=default_args,
           concurrency=10,
           max_active_runs=5,
           schedule_interval='@hourly',
           orientation='LR',
           tags=['testers']
   
   ) as dag1:
   
       for i in range(150):
           t = PythonOperator(
               task_id=f'python_{i}',
               python_callable=sleep,
               op_args=[""],
               priority_weight=-100,
           )
   
   with DAG(
           os.path.basename(__file__).replace('.py', '') + '_2',  # name of the 
dag
           default_args=default_args,
           concurrency=7,
           max_active_runs=2,
           schedule_interval='@hourly',
           orientation='LR',
           tags=['testers']
   
   ) as dag2:
   
       for i in range(150):
           t = PythonOperator(task_id=f'python_{i}',
                              python_callable=sleep,
                              op_args=[""],
                              )
   
   
   with DAG(
           os.path.basename(__file__).replace('.py', '') + '_3',  # name of the 
dag
           default_args=default_args,
           concurrency=1,
           max_active_runs=1,
           schedule_interval='@hourly',
           orientation='LR',
           tags=['testers']
   
   ) as dag3:
   
       t1 = PythonOperator(task_id=f'python', python_callable=sleep, 
op_args=[""])
   
       for i in range(10):
           t2 = PythonOperator(task_id=f'python_{i}', python_callable=sleep, 
op_args=[""])
           t1 >> t2
           t1 = t2
   ```
   
   </p>
   </details>
   
   
   ### Anything else
   
   Digging around the code, I found that there's a limit on the query scheduler 
preforms 
[here](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/jobs/scheduler_job.py#L928)
 , that comes from 
[here](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/jobs/scheduler_job.py#L1141),
 and actually seems to be calculated overall from the global `parallelism` 
value.
   So actually what happens, is that scheduler queries DB with a limit, gets 
back a partial list of tasks that are actually cannot be executed because of 
the dag-level concurrency, and gets to other tasks that are able to run only 
when there's a window between big dags execution. Increasing the `parallelism` 
to 1024 solved the issue in our case.
   The `parallelism` parameter in this case is very confusing, because it 
should indicate tasks that can be `run concurrently`, but actually limits the 
scheduler's ability to move tasks from scheduled to queued... 
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to