[ https://issues.apache.org/jira/browse/AIRFLOW-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010905#comment-17010905 ]
Luke Bodeen commented on AIRFLOW-1510: -------------------------------------- [~ash] mentioned de-coupling dag parsing from scheduling but that is a much larger effort, one which might already be in flight? > Scheduler: priority_weight sorting not applied across multiple DAGs > ------------------------------------------------------------------- > > Key: AIRFLOW-1510 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1510 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler > Affects Versions: 1.8.1 > Environment: Ubuntu, CeleryRunner > Reporter: Joseph Harris > Priority: Major > > h3. Issue > When there are multiple available tasks across many DAGs, the order in which > those tasks are queued does not respect the priority_weighting order across > all available tasks. The run order is instead dependent on how quickly the > Scheduler loop reaches the DAG, when the number of DAGs is greater than the > scheduler 'max_threads' variable. > This is particularly problematic when there are long-running tasks competing > over a limited slots in a pool. > With over 80 DAGs in operations, increasing max_threads to this number > doesn't seem like a practical solution. > h3. What should be done > * The docs should be updated to be less misleading about how priority_weight > is likely to behave: https://airflow.incubator.apache.org/concepts.html#pools > * Potential implementation improvements on the scheduler: force the > ProcessorManager to wait for all jobs to be processed (slow but reliable) - > or make _execute_task_instances() look at tasks from all DAGs (faster but > less reliable). > h3. Example > For instance, with 4 tasks: > || DAG ||Task||Priority||Pool|| > || A || 1 |20|pool| > || B || 2 |1|pool| > || C || 3 |1|pool| > || D || 4 |100|pool| > The scheduler would look at DAGs A & B first, and send the tasks in order (1, > 2). Then the scheduler would look at 3 & 4 and send these in order (4, 3) if > there are enough pool slots available. > h3. Current Implementation Detail > The SchedulerJob code is a bit complex, but the sequence of events in the > scheduler loop looks like this: > * The > [DagFileProcessorManager|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/utils/dag_processing.py#L298] > loops across all discovered DAG files, and launches Processor threads > (limited by max_threads). Each processor reads in a single DAG, and checks > whether any tasks in the DAG have the dependencies met. If dependencies are > met, the task is set to state='scheduled'. > * > [DagFileProcessorManager.heartbeat()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1409] > returns a list of DAGs returned by the Processor threads during its last > cycle. When max_threads = 2, this list will contain a maximum of 2 DAGs. > * These DAGs are [passed to > SchedulerJob._execute_task_instances()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1440] > * An [ORM > query|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L992] > selects tasks where state='scheduled' and the task is in the DAGs returned > by the last heartbeat() loop. *Only those tasks are [sorted and passed to the > Celery > queue|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1035]* -- This message was sent by Atlassian Jira (v8.3.4#803005)