vapiravfif opened a new issue #19622:
URL: https://github.com/apache/airflow/issues/19622
### Apache Airflow version
2.0.2
### Operating System
amzn linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
AIRFLOW__CORE__PARALLELISM: "128"
worker_concurrency: 32
2 Celery workers
MySQL 8.0.23 RDS as a DB backend
### What happened
* Tasks get stuck in "scheduled" state for hours, in task details it says
that "All dependencies are met but the task instance is not running"
* The stuck tasks are executed eventually
* Usually, at the same time, there're DAGs with >100 tasks are running
* The big dags are limited by dag-level `concurrency` parameter to 10 tasks
at a time
* Workers and pools have plenty of free slots
* If big dags are switched off - starving tasks are picked up immediately,
even if tasks from the big dags are still running
* In scheduler logs, the starving task do not appear in the "tasks up for
execution" list
* Number of concurrent tasks that are actually running is around 30 total on
both executors (out of 64 available slots)
### What you expected to happen
As there are enough slots on the workers & pools, I expect tasks that are
ready and actually *can* run to be picked up and moved to queued by scheduler
### How to reproduce
This example dag should reproduce the problem on environment with at least
20-25 available slots and core parallelism of 128. The dag that will get
starved is the "tester_multi_load_3". Not each task, but on my env there were
holes of up to 20 minutes between tasks execution. Guess the starvation time
depends on ordering (?), as I'm not adding any weights...
<details><summary>CLICK ME</summary>
<p>
```python
import os
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'Tester',
'depends_on_past': False,
'start_date': datetime(2021, 7, 17),
'retries': 5
}
def sleep(timer):
if not timer:
timer = 60
print(f'Timer is {str(timer)}')
time.sleep(timer)
with DAG(
dag_id=os.path.basename(__file__).replace('.py', '') + '_1', # name
of the dag
default_args=default_args,
concurrency=10,
max_active_runs=5,
schedule_interval='@hourly',
orientation='LR',
tags=['testers']
) as dag1:
for i in range(150):
t = PythonOperator(
task_id=f'python_{i}',
python_callable=sleep,
op_args=[""],
priority_weight=-100,
)
with DAG(
os.path.basename(__file__).replace('.py', '') + '_2', # name of the
dag
default_args=default_args,
concurrency=7,
max_active_runs=2,
schedule_interval='@hourly',
orientation='LR',
tags=['testers']
) as dag2:
for i in range(150):
t = PythonOperator(task_id=f'python_{i}',
python_callable=sleep,
op_args=[""],
)
with DAG(
os.path.basename(__file__).replace('.py', '') + '_3', # name of the
dag
default_args=default_args,
concurrency=1,
max_active_runs=1,
schedule_interval='@hourly',
orientation='LR',
tags=['testers']
) as dag3:
t1 = PythonOperator(task_id=f'python', python_callable=sleep,
op_args=[""])
for i in range(10):
t2 = PythonOperator(task_id=f'python_{i}', python_callable=sleep,
op_args=[""])
t1 >> t2
t1 = t2
```
</p>
</details>
### Anything else
Digging around the code, I found that there's a limit on the query scheduler
preforms
[here](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/jobs/scheduler_job.py#L928)
, that comes from
[here](https://github.com/apache/airflow/blob/10023fdd65fa78033e7125d3d8103b63c127056e/airflow/jobs/scheduler_job.py#L1141),
and actually seems to be calculated overall from the global `parallelism`
value.
So actually what happens, is that scheduler queries DB with a limit, gets
back a partial list of tasks that are actually cannot be executed because of
the dag-level concurrency, and gets to other tasks that are able to run only
when there's a window between big dags execution. Increasing the `parallelism`
to 1024 solved the issue in our case.
The `parallelism` parameter in this case is very confusing, because it
should indicate tasks that can be `run concurrently`, but actually limits the
scheduler's ability to move tasks from scheduled to queued...
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]