pulsar314 opened a new issue #9316:
URL: https://github.com/apache/airflow/issues/9316


   **Apache Airflow version**: 1.10.10 (from PyPI)
   
   
   **Environment**:
   
   - **OS**: Ubuntu 18.04.4 LTS
   - **Database**: PostgreSQL
   
   
   **What happened**:
   
   Firstly, Airflow summarizes required slots for all queued tasks. As a 
result, if the total number of slots required across a dag exceeds the limit, 
all the tasks get failed. There's a log for three tasks of weight 12 (TASK_A in 
the logs below), 12 (TASK_B) and 16 (TASK_C), and a pool with 24 slots: 
[deps_issue.log](https://github.com/apache/airflow/files/4781470/deps_issue.log)
   
   The second issue appears after the first one. The tasks get rescheduled due 
to missing limits. At the same time, corresponding runs report as they are 
finished. There's a mechanism which fails tasks in queued state if they were 
reported as finished. This should help with tasks stuck in queued state, but in 
fact it prevents tasks from being rescheduled. There's a combined log across 
components: 
[failure_issue.log](https://github.com/apache/airflow/files/4781482/failure_issue.log)
   
   
   **What you expected to happen**:
   
   Airflow executes tasks in two stages - 12+12 and 16 (or clockwise)
   
   
   **How to reproduce it**:
   
   1. Create a pool with capacity 24
   2. Create a dag with three tasks which require 12, 12 and 16 slots assigned 
to the pool
   3. Trigger the dag
   
   
   **Anything else we need to know**:
   
   
   All the task should be able to run simultaneously. The issue is observed on 
local and Celery executors.
   
   An example of a dag failing:
   ```python
   # coding: utf-8
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   from airflow.utils.dates import days_ago
   
   default_args = {
       'owner': 'airflow',
       'depends_on_past': False,
       'start_date': days_ago(1),
       'schedule_interval': None
   }
   
   dag = DAG(
       'THE_DAG',
       default_args=default_args,
       schedule_interval=None,
       concurrency=10,
   )
   
   
   def op(**kwargs):
       print('Hello!')
   
   
   with dag:
       PythonOperator(
           task_id='TASK_A',
           provide_context=True,
           python_callable=op,
           pool='THE_POOL',
           pool_slots=12,
       )
       PythonOperator(
           task_id='TASK_B',
           provide_context=True,
           python_callable=op,
           pool='THE_POOL',
           pool_slots=12,
       )
       PythonOperator(
           task_id='TASK_C',
           provide_context=True,
           python_callable=op,
           pool='THE_POOL',
           pool_slots=16,
       )
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to