GitHub user Asquator edited a comment on the discussion: Redesign the scheduler logic to avoid starvation due to dropped tasks in critical section
A good point is to clarify the ultimate goal we're striving to achieve. So far, always picking `max_tis` tasks when it's possible sounds sane, as there is no obvious reason to not schedule a ready task. Or is there? If there is such reason, we have to state that explicitly. Currently, the reasons for dropping tasks are the [following filters](https://github.com/apache/airflow/blob/1781dc9d5fcbb93bce7018a4050ddd36ec1a65d5/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L339C9-L341C81): ```py starved_dags: set[str] = set() starved_tasks: set[tuple[str, str]] = set() starved_tasks_task_dagrun_concurrency: set[tuple[str, str, str]] = set() ``` Note that they are set to be empty, and only updated dynamically for the "rare" case that we will get to the second iteration. One of them is an exception: ```py starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0} ``` As we check some edge case of a fully-occupied pool. It doesn't really matter, as each task specifies its own priority, and the weights are unrelated to pools or any other concurrency limit in our model. In certain configurations this condition will be always unmet (assume sum(task.pool_slots) != pool["total_slots"] for some pool), so `starved_pools` joins the party. Now we get at least 4 conditions that cause tasks to be dropped. We can enhance the query to window over every possible filter (or some subset of them) and avoid looping at all, knowing that the query returns just the good tasks, though windowing over numerical values may get us into cardinality issues and slow SQL performance. To explain it better, look at the following scenario: Dag A - 10000 tasks with concurrency 5 Dag B - 10 tasks with concurrency 10 Same pool, same priority In the first scheduler iteration, 5 tasks from Dag A are scheduled, `max_tis - 5` are dropped. In subsequent iterations, we'll have the chance to schedule tasks from Dag B. With current implementation, assuming one A slot is freed up in every scheduler run, and if we window just by pool and priority but not the concurrency, the chance of picking B tasks will be small and we'll spin several hundreds of iterations, dropping A tasks and waiting for random mercy, while technically being able to schedule B tasks. The phenomenon can also occur if we have a DAG with low `max_active_tis_per_dagrun` firing a lot of mapped tasks, most of which will be dropped over next loop runs. Dropped tasks inevitably cause delays in scheduling, and despite Airflow not being a real time system, this effect doesn't look like a desirable one, having a plenty of reproducible edge cases which occur in real data systems. I wonder if there is an agreement on `max_tis == desired_tis` (at least logically) in the community, so we can formally model the desired mechanism. The main problem here is the inability of the scheduling mechanism to handle all the concurrency policies right due to task dropping and wasting critical sections. Currently the only practical solution is to increase the number of schedulers until we bump into the bottleneck of the critical section which won't be capable to withstand the heavy load. We can try to compute the `starved_` filters before the `select` query (which is basically windowing), instead of detecting them each time after the query is fired. Another path is to relieve the parameters of the system (like transforming priority to weights) to make the query lighter. Anyway, benchmarking is required. GitHub link: https://github.com/apache/airflow/discussions/49160#discussioncomment-12820540 ---- This is an automatically sent email for commits@airflow.apache.org. To unsubscribe, please send an email to: commits-unsubscr...@airflow.apache.org