nitaibezerra opened a new issue #21473:
URL: https://github.com/apache/airflow/issues/21473


   ### Apache Airflow version
   
   2.2.2
   
   ### What happened
   
   I'm running two DAGs composed by 50 independent tasks in each DAG. Lets call 
DAG **A** and DAG **B**. DAG **A** has `max_active_tasks` set to 1 which means 
that only one task can run each time in this DAG. 
   
   When I run only DAG **B** its 50 tasks get executed properly, respecting the 
`AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG ` parameter, which limits the number 
of running tasks at the same time, defaulted to 16. 
   
   But when I run DAG **A** and **B** at the same time all tasks of DAG **B** 
get stucked in `scheduled` state. On the other hand DAG **A** still executing 
properly, running tasks one by one, respecting the `max_active_tasks` parameter.
   
   It seems that the **max_active_tasks** used together with a **high number of 
scheduled tasks** is causing a side effect on any other DAG.
   
   Only after a part of DAG **A** tasks gets executed Airflow starts to execute 
DAG **B** tasks. 
   
   One way to overcome this bug is raising `AIRFLOW__CORE__PARALLELISM` 
variable. Nevertheless it seems a logical bug in the scheduler.
   
   ### What you expected to happen
   
   Considering that `max_active_tasks` was set only in DAG **A**, DAG **B** 
tasks should start running anytime since they were scheduled.
   
   ### How to reproduce
   
   * Airflow 2.2.2 with `LocalExecutor` on Python `3.9`
   
   * DA **A**
    ```
   from datetime import datetime
   import time
   
   from airflow.decorators import dag, task
   
   TOTAL_TASKS = 50
   
   @dag(schedule_interval=None,
        start_date=datetime(2022, 1, 15),
        default_args={'owner': 'nitai'},
        max_active_tasks=1)
   def DAG_A():
   
       @task()
       def sleepinp_task():
           time.sleep(10)
   
       for _ in range(TOTAL_TASKS):
           sleepinp_task()
   
   dag = DAG_A()
   ```
   
   * DA **B**
   ```
   from datetime import datetime
   import time
   
   from airflow.decorators import dag, task
   
   TOTAL_TASKS = 50
   
   @dag(schedule_interval=None,
        start_date=datetime(2022, 1, 15),
        default_args={'owner': 'nitai'})
   def DAG_B():
   
       @task()
       def sleepinp_task():
           time.sleep(10)
   
       for _ in range(TOTAL_TASKS):
           sleepinp_task()
   
   dag = DAG_B()
   ```
   
   Start DAG **A** and after a few seconds start DAG **B**.
   
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==2.4.0
   apache-airflow-providers-celery==2.1.0
   apache-airflow-providers-cncf-kubernetes==2.1.0
   apache-airflow-providers-docker==2.3.0
   apache-airflow-providers-elasticsearch==2.1.0
   apache-airflow-providers-ftp==2.0.1
   apache-airflow-providers-google==6.1.0
   apache-airflow-providers-grpc==2.0.1
   apache-airflow-providers-hashicorp==2.1.1
   apache-airflow-providers-http==2.0.1
   apache-airflow-providers-imap==2.0.1
   apache-airflow-providers-microsoft-azure==3.3.0
   apache-airflow-providers-mongo==2.3.0
   apache-airflow-providers-mysql==2.1.1
   apache-airflow-providers-odbc==2.0.1
   apache-airflow-providers-postgres==2.3.0
   apache-airflow-providers-redis==2.0.1
   apache-airflow-providers-sendgrid==2.0.1
   apache-airflow-providers-sftp==2.2.0
   apache-airflow-providers-slack==4.1.0
   apache-airflow-providers-sqlite==2.0.1
   apache-airflow-providers-ssh==2.3.0
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   As I mentioned before, only after a part of DAG **A** tasks get executed 
Airflow starts to execute DAG **B** tasks. I defined the number of tasks as 50 
to make sure the bug keeps occurring for a while. The lowest number of tasks in 
DAG **A** to activate this bug is 33. It means that at the beginning 1 task 
will start running and 32 will be kept in scheduled state. After that first 
task ended running (and one another DAG **A** task starts running) the DAG 
**B** acquires space to start running. But only one task at this moment. As the 
number of DAG **A** tasks in scheduled state decreases, the number of DAG **B** 
tasks running at the same time increases. 
   
   From this experiment I can conclude that in some point the scheduler is 
limiting to 32 the number of tasks checked to transition from `scheduled` to 
`queued` state and ordering this query prioritizing the DAG **A** scheduled 
tasks that are forbidden to run due to the `max_active_tasks` usage.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to