GitHub user Asquator edited a comment on the discussion: Redesign the scheduler 
logic to avoid starvation due to dropped tasks in critical section

A good point is to clarify the ultimate goal we're striving to achieve. So far, 
always picking `max_tis` tasks when it's possible sounds sane, as there is no 
obvious reason to not schedule a ready task. Or is there? If there is such 
reason, we have to state that explicitly.
Currently, the reasons for dropping tasks are the [following 
filters](https://github.com/apache/airflow/blob/1781dc9d5fcbb93bce7018a4050ddd36ec1a65d5/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L339C9-L341C81):
```py
starved_dags: set[str] = set()
starved_tasks: set[tuple[str, str]] = set()
starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = set()
```

Note that they are set to be empty, and only updated dynamically for the "rare" 
case that we will get to the second iteration. One of them is an exception:
```py
starved_pools = {pool_name for pool_name, stats in pools.items() if 
stats["open"] <= 0}
```

As we check some edge case of a fully-occupied pool. It doesn't really matter, 
as each task specifies its own priority, and the weights are unrelated to pools 
or any other concurrency limit in our model. In certain configurations this 
condition will be always unmet (assume sum(task.pool_slots) != 
pool["total_slots"] for some pool), so `starved_pools` joins the party.

Now we get at least 4 conditions that cause tasks to be dropped. We can enhance 
the query to window over every possible filter (or some subset of them) and 
avoid looping at all, knowing that the query returns just the good tasks, 
though windowing over numerical values may get us into cardinality issues and 
slow SQL performance. 

To explain it better, look at the following scenario:
Dag A - 10000 tasks with concurrency 5
Dag B - 10 tasks with concurrency 10
Same pool, same priority

In the first scheduler iteration, 5 tasks from Dag A are scheduled, `max_tis - 
5` are dropped. In subsequent iterations, we'll have the chance to schedule 
tasks from Dag B. With current implementation, assuming one A slot is freed up 
in every scheduler run, and if we window just by pool and priority but not the 
concurrency, the chance of picking B tasks will be small and we'll spin several 
hundreds of iterations, dropping A tasks and waiting for random mercy, while 
technically being able to schedule B tasks. The phenomenon can also occur if we 
have a DAG with low `max_active_tis_per_dagrun` firing a lot of mapped tasks, 
most of which will be dropped over next loop runs.

Dropped tasks inevitably cause delays in scheduling, and despite Airflow not 
being a real time system, this effect doesn't look like a desirable one, having 
a plenty of reproducible edge cases which occur in real data systems. I wonder 
if there is an agreement on `max_tis == desired_tis` (at least logically) in 
the community, so we can formally model the desired mechanism.

The main problem here is the inability of the scheduling mechanism to handle 
all the concurrency policies right due to task dropping and wasting critical 
sections. Currently the only practical solution is to increase the number of 
schedulers until we bump into the bottleneck of the critical section which 
won't be capable to withstand the heavy load. We can try to compute the 
`starved_` filters before the `select` query (which is basically windowing), 
instead of detecting them each time after the query is fired. Another path is 
to relieve the parameters of the system (like transforming priority to weights) 
to make the query lighter. Anyway, benchmarking is required.

GitHub link: 
https://github.com/apache/airflow/discussions/49160#discussioncomment-12820540

----
This is an automatically sent email for commits@airflow.apache.org.
To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org

Reply via email to