Joseph Harris created AIRFLOW-1510:
--------------------------------------
Summary: Scheduler: priority_weight sorting not applied across
multiple DAGs
Key: AIRFLOW-1510
URL: https://issues.apache.org/jira/browse/AIRFLOW-1510
Project: Apache Airflow
Issue Type: Bug
Components: scheduler
Affects Versions: 1.8.1
Environment: Ubuntu, CeleryRunner
Reporter: Joseph Harris
h3. Issue
When there are multiple available tasks across many DAGs, the order in which
those tasks are queued does not respect the priority_weighting order across all
available tasks. The run order is instead dependent on how quickly the
Scheduler loop reaches the DAG, when the number of DAGs is greater than the
scheduler 'max_threads' variable.
This is particularly problematic when there are long-running tasks competing
over a limited slots in a pool.
With over 80 DAGs in operations, increasing max_threads to this number doesn't
seem like a practical solution.
h3. What should be done
* The docs should be updated to be less misleading about how priority_weight is
likely to behave: https://airflow.incubator.apache.org/concepts.html#pools
* Potential implementation improvements on the scheduler: force the
ProcessorManager to wait for all jobs to be processed (slow but reliable) - or
make _execute_task_instances() look at tasks from all DAGs (faster but less
reliable).
h3. Example
For instance, with 4 tasks:
|| DAG ||Task||Priority||Pool||
|| A || 1 |20|pool|
|| B || 2 |1|pool|
|| C || 3 |1|pool|
|| D || 4 |100|pool|
The scheduler would look at DAGs A & B first, and send the tasks in order (1,
2). Then the scheduler would look at 3 & 4 and send these in order (4, 3) if
there are enough pool slots available.
h3. Current Implementation Detail
The SchedulerJob code is a bit complex, but the sequence of events in the
scheduler loop looks like this:
* The
[DagFileProcessorManager|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/utils/dag_processing.py#L298]
loops across all discovered DAG files, and launches Processor threads (limited
by max_threads). Each processor reads in a single DAG, and checks whether any
tasks in the DAG have the dependencies met. If dependencies are met, the task
is set to state='scheduled'.
*
[DagFileProcessorManager.heartbeat()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1409]
returns a list of DAGs returned by the Processor threads during its last
cycle. When max_threads = 2, this list will contain a maximum of 2 DAGs.
* These DAGs are [passed to
SchedulerJob._execute_task_instances()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1440]
* An [ORM
query|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L992]
selects tasks where state='scheduled' and the task is in the DAGs returned by
the last heartbeat() loop. *Only those tasks are [sorted and passed to the
Celery
queue|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1035]*
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)