[
https://issues.apache.org/jira/browse/AIRFLOW-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17009866#comment-17009866
]
Luke Bodeen commented on AIRFLOW-1510:
--------------------------------------
So, If I read and interpret the above correctly,
[DagFileProcessorManager|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/utils/dag_processing.py#L298]
_Should_ probably start looking at priority_weight also?
Anyone else agree/disagree?
> Scheduler: priority_weight sorting not applied across multiple DAGs
> -------------------------------------------------------------------
>
> Key: AIRFLOW-1510
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1510
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: 1.8.1
> Environment: Ubuntu, CeleryRunner
> Reporter: Joseph Harris
> Priority: Major
>
> h3. Issue
> When there are multiple available tasks across many DAGs, the order in which
> those tasks are queued does not respect the priority_weighting order across
> all available tasks. The run order is instead dependent on how quickly the
> Scheduler loop reaches the DAG, when the number of DAGs is greater than the
> scheduler 'max_threads' variable.
> This is particularly problematic when there are long-running tasks competing
> over a limited slots in a pool.
> With over 80 DAGs in operations, increasing max_threads to this number
> doesn't seem like a practical solution.
> h3. What should be done
> * The docs should be updated to be less misleading about how priority_weight
> is likely to behave: https://airflow.incubator.apache.org/concepts.html#pools
> * Potential implementation improvements on the scheduler: force the
> ProcessorManager to wait for all jobs to be processed (slow but reliable) -
> or make _execute_task_instances() look at tasks from all DAGs (faster but
> less reliable).
> h3. Example
> For instance, with 4 tasks:
> || DAG ||Task||Priority||Pool||
> || A || 1 |20|pool|
> || B || 2 |1|pool|
> || C || 3 |1|pool|
> || D || 4 |100|pool|
> The scheduler would look at DAGs A & B first, and send the tasks in order (1,
> 2). Then the scheduler would look at 3 & 4 and send these in order (4, 3) if
> there are enough pool slots available.
> h3. Current Implementation Detail
> The SchedulerJob code is a bit complex, but the sequence of events in the
> scheduler loop looks like this:
> * The
> [DagFileProcessorManager|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/utils/dag_processing.py#L298]
> loops across all discovered DAG files, and launches Processor threads
> (limited by max_threads). Each processor reads in a single DAG, and checks
> whether any tasks in the DAG have the dependencies met. If dependencies are
> met, the task is set to state='scheduled'.
> *
> [DagFileProcessorManager.heartbeat()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1409]
> returns a list of DAGs returned by the Processor threads during its last
> cycle. When max_threads = 2, this list will contain a maximum of 2 DAGs.
> * These DAGs are [passed to
> SchedulerJob._execute_task_instances()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1440]
> * An [ORM
> query|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L992]
> selects tasks where state='scheduled' and the task is in the DAGs returned
> by the last heartbeat() loop. *Only those tasks are [sorted and passed to the
> Celery
> queue|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1035]*
--
This message was sent by Atlassian Jira
(v8.3.4#803005)