pulsar314 opened a new issue #9316: URL: https://github.com/apache/airflow/issues/9316
**Apache Airflow version**: 1.10.10 (from PyPI) **Environment**: - **OS**: Ubuntu 18.04.4 LTS - **Database**: PostgreSQL **What happened**: Firstly, Airflow summarizes required slots for all queued tasks. As a result, if the total number of slots required across a dag exceeds the limit, all the tasks get failed. There's a log for three tasks of weight 12 (TASK_A in the logs below), 12 (TASK_B) and 16 (TASK_C), and a pool with 24 slots: [deps_issue.log](https://github.com/apache/airflow/files/4781470/deps_issue.log) The second issue appears after the first one. The tasks get rescheduled due to missing limits. At the same time, corresponding runs report as they are finished. There's a mechanism which fails tasks in queued state if they were reported as finished. This should help with tasks stuck in queued state, but in fact it prevents tasks from being rescheduled. There's a combined log across components: [failure_issue.log](https://github.com/apache/airflow/files/4781482/failure_issue.log) **What you expected to happen**: Airflow executes tasks in two stages - 12+12 and 16 (or clockwise) **How to reproduce it**: 1. Create a pool with capacity 24 2. Create a dag with three tasks which require 12, 12 and 16 slots assigned to the pool 3. Trigger the dag **Anything else we need to know**: All the task should be able to run simultaneously. The issue is observed on local and Celery executors. An example of a dag failing: ```python # coding: utf-8 from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': days_ago(1), 'schedule_interval': None } dag = DAG( 'THE_DAG', default_args=default_args, schedule_interval=None, concurrency=10, ) def op(**kwargs): print('Hello!') with dag: PythonOperator( task_id='TASK_A', provide_context=True, python_callable=op, pool='THE_POOL', pool_slots=12, ) PythonOperator( task_id='TASK_B', provide_context=True, python_callable=op, pool='THE_POOL', pool_slots=12, ) PythonOperator( task_id='TASK_C', provide_context=True, python_callable=op, pool='THE_POOL', pool_slots=16, ) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
