[ 
https://issues.apache.org/jira/browse/AIRFLOW-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010905#comment-17010905
 ] 

Luke Bodeen commented on AIRFLOW-1510:
--------------------------------------

[~ash] mentioned de-coupling dag parsing from scheduling but that is a much 
larger effort, one which might already be in flight?

> Scheduler: priority_weight sorting not applied across multiple DAGs
> -------------------------------------------------------------------
>
>                 Key: AIRFLOW-1510
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1510
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.8.1
>         Environment: Ubuntu, CeleryRunner
>            Reporter: Joseph Harris
>            Priority: Major
>
> h3. Issue
> When there are multiple available tasks across many DAGs, the order in which 
> those tasks are queued does not respect the priority_weighting order across 
> all available tasks. The run order is instead dependent on how quickly the 
> Scheduler loop reaches the DAG, when the number of DAGs is greater than the 
> scheduler 'max_threads' variable.
> This is particularly problematic when there are long-running tasks competing 
> over a limited slots in a pool.
> With over 80 DAGs in operations, increasing max_threads to this number 
> doesn't seem like a practical solution.
> h3. What should be done
> * The docs should be updated to be less misleading about how priority_weight 
> is likely to behave: https://airflow.incubator.apache.org/concepts.html#pools
> * Potential implementation improvements on the scheduler: force the 
> ProcessorManager to wait for all jobs to be processed (slow but reliable) - 
> or make _execute_task_instances() look at tasks from all DAGs (faster but 
> less reliable).
> h3. Example
> For instance, with 4 tasks:
> || DAG ||Task||Priority||Pool||
> || A || 1 |20|pool|
> || B || 2 |1|pool|
> || C || 3 |1|pool|
> || D || 4 |100|pool|
> The scheduler would look at DAGs A & B first, and send the tasks in order (1, 
> 2). Then the scheduler would look at 3 & 4 and send these in order (4, 3) if 
> there are enough pool slots available.
> h3. Current Implementation Detail
> The SchedulerJob code is a bit complex, but the sequence of events in the 
> scheduler loop looks like this:
> * The 
> [DagFileProcessorManager|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/utils/dag_processing.py#L298]
>  loops across all discovered DAG files, and launches Processor threads 
> (limited by max_threads). Each processor reads in a single DAG, and checks 
> whether any tasks in the DAG have the dependencies met. If dependencies are 
> met, the task is set to state='scheduled'.
> * 
> [DagFileProcessorManager.heartbeat()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1409]
>  returns a list of DAGs returned by the Processor threads during its last 
> cycle. When max_threads = 2, this list will contain a maximum of 2 DAGs.
> * These DAGs are [passed to 
> SchedulerJob._execute_task_instances()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1440]
> * An [ORM 
> query|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L992]
>  selects tasks where state='scheduled' and the task is in the DAGs returned 
> by the last heartbeat() loop. *Only those tasks are [sorted and passed to the 
> Celery 
> queue|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1035]*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to