The idea proposed above is:
"Don't even create tasks that can't be run due to concurrency limits (except 
executor slots)"
OR (slightly weaker):
"Make concurrency limits (except executor slots) apply at the SCHEDULED state 
except of QUEUED"

Let's reason logically about the worst cases and the bottlenecks here.

If we MUST maintain the per-task priorities as they're implemented today, then 
in the worst case we MUST go over ALL the candidate tasks. That's because WE 
MUST look from top to bottom every time, and we also WANT to avoid starvation 
of tasks at the bottom, if the first batch can't run due to concurrency limits. 
It follows that if we have global per-task priorities as we do, in the worst 
case we WILL check every candidate task (for mapped tasks we don't have to 
iterate over every mapped TI, as looking at the operator itself is enough). As 
micro-batching (fetchmany) is pretty slow when done in a loop, especially in 
python, we have the following question:

Is holding ALL of the individual tasks (operators, not including mapped 
duplicates) in memory allowed at once?

If we can give up the per-task priorities or make them weaker (I opened a 
discussion for that), we can do round-robin scheduling and avoid holding all at 
once in memory. Just do micro-batching across scheduler iterations and be good. 
The concurrency limits will be checked when creating tasks/transferring them to 
SCHEDULED, say per N DAG runs each time, while every DAG run will have the 
`last_scheduling_decision` defined.

We'll probably want a strong priority limit one day, and defining it on task 
level is not desired. Say it's defined on DAG run level, then we'll have to 
actually subtract pool slots and other concurrency slots even if tasks are not 
actually scheduled, when we descend to a lower priority rank. The 
micro-batching on DAG run level looks appealing, as we can store the DAG runs 
in a local cache per scheduler and avoid repetitive queries to fetch the same 
DAG runs over and over again. The iterations will be done in python, and state 
will be preserved between scheduler iterations.

So, rewriting it in code demands some drastic changes in scheduler logic and I 
believe they're possible, especially using the SCHEDULED state to sieve out 
tasks that can't be run due to concurrency limit. The QUEUED state will simply 
become a sieve for executor slots limits. Without this drastic change, the 
sprocs will remain a second-to-none solution - we can't both store thousands of 
TIs in the DB and expect to come up with a python solution.

Caching DAG runs in memory means again that we HAVE TO store as many DAG runs 
in state RUNNING between scheduler iterations so we avoid frequent RTT queries, 
which are pretty painful, as we can examine thousands of DAG runs in every 
iterations.

The question restated:
How many concurrent active DAG runs should we prepare for, so we can decide 
whether they can be stored in memory?


> Hello, I have thought of a way that might work, in order to both fix
> starvation, and reduce complexity, while increasing maintainability.
> 
> 
> The main issue that causes starvation is that we have scheduled task
> instances which /cannot/ be ran, and filtering it dynamically with sql,
> is not really possible without either, significantly hindering
> performance, or introducing query and maintainability complexity, just
> like the stores sql procedures do, though they do solve the issue, they
> create a new one where we have 3 implementations, one for each sql
> dialect, which in my opinion, is better to be avoided.
> 
> 
> The proposed solution is that in the dagrun 'update_state' function
> (which is part of a critical section) we only create tasks which can be
> set to running, using the new fields added, and moving the heavy part
> away from the task selection in the '_executable_tis_to_queued' method,
> where we will only have to do one query, no loop, and only check
> executor slots.
> 
> 
> To achieve this, we can use the 'concurrency map' created in sql in the
> pessimistic-task-selection-with-window-functions pr
> 
> (https://github.com/apache/airflow/pull/53492)
> 
> and use it to get simple and lightweight mappings, which will look like
> so (sorted by priority_order and DR logical date):
> 
> -------------------------
> 
> {dag_id} | {dagrun_id} | {task_id} | currently running
> mapped tasks (if any) | total running tis per dagrun | total
> tis per task run | pool_slots
> 
> --------------------------
> 
> And a second key value pairs for pools:
> 
> {pool_id} | available_pool_slots
> 
> 
> And now, after we have sorted these fields, we can select the top N
> dagruns to have their state updated according to the order of the sorted
> table above, and to the limit for dagruns to examine each loop, and
> create task instances for those dagruns only, and we create as many
> tasks as we can, by priority, as long as the concurrency limits are
> preserved, this keeps the current behaviour, while examining more
> prioritized operators first, and then moving to less prioritized ones,
> we can also improve it by increasing the limit of dagruns to be examined
> or introducing a 'last_scheduling_decision' sorting as well so that we
> update the state of all dagruns in a round robin fashion rather than
> only getting the most prioritized ones first, however, this does break
> the current priority behaviour a little.
> 
> 
> This means that instead of taking a lock on the pools table during the
> task fetching, we take the lock on the pool while examining dagruns and
> updating their state, and during task fetching we lock only the
> 'task_instance' rows, meaning schedulers can now set tasks to running in
> parallel.
> 
> 
> There are a few other possible derivations of the same solution,
> including, looking at each dagrun individually and sending a query per
> dagrun, and thus, only locking the pool/pools the current dagrun's
> operators are assigned to, allowing multiple schedulers to do the same
> work on different dagruns as long as they are in different pools.
> 
> 
> Another derivation could be to just go over all dagruns with no limit
> until we cannot create new tasks instances that can be set to running
> (according to the concurrency limits), which will change the sorting
> order a little, where operator priority order comes AFTER dagrun logical
> date.
> 
> 
> In conclusion, this solution could be a viable option, it also
> simplifies the task selection process a lot and can help remove a lot of
> code, while making it easier to maintain, and using a simpler algorithm,
> and it also changes the semantic of the scheduled' state of the
> task_instance, where now, a 'scheduled' ti, can be sent to the executor
> immediately, and so tasks will not be in the 'scheduled' state for long.
> 
> 
> This proposed solution is, in my opinion the simplest solution, and most
> maintainable one, though I might be biased, as I have explored multiple
> solutions with Asquator, and I have come to the conclusion that solving
> it using SQL is not a viable option, I have yet to open the PR to
> airflow, and only have it locally on my computer as I am making a lot of
> experimental commits, and a lot of things are changing very quickly,
> once I have stabilized things a little, I will open the PR for anyone
> who's interested.
> 
> 
> I would appreciate any feedback or other ideas/propositions from the
> airflow community, both on this solution and its derivations or on any
> alternative solutions.
> 
> 
> On 9/24/25 9:11 PM, asquator wrote:
> 
> > It is a complication, but it seems as we can't do any better and remain 
> > scalable. In the end we want priorities enforced (mb not in the way they're 
> > implemented today, but it's part of another talk), and we don't know how 
> > many tasks we'll have to iterate over in advance, so fetching them into 
> > python is a death sentence in some situations (not joking, tried that with 
> > fetchmany and chunked streaming, it was way too slow).
> > 
> > I actually thought of another optimization here:
> > Instead of fetching the entire TI relation, we can ignore mapped tasks and 
> > only fetch individual tasks (operators), expanding them on the fly into the 
> > maximum number of TIs that can be created. And yet this approach is not 
> > scalable, as some enormous backfill of a DAG with just 10 tasks will make 
> > it fetch MBs of data every time. It's very slow and loads the DB server 
> > with heavy network requests.
> > 
> > Well, it's not just about throughput, but starvation of tasks that can't 
> > run for hours sometimes, and unfortunately we encounter this in production 
> > very often.
> > 
> > On Wednesday, September 24th, 2025 at 3:10 AM, Matthew 
> > [email protected] wrote:
> > 
> > > Hi,
> > > This seems like a significant level of technical complication/debt 
> > > relative
> > > to even a 1.5x/2x gain (which as noted is only in certain workloads).
> > > Given airflow scheduling code in general is not something one could
> > > describe as simple, introducing large amounts of static code that lives in
> > > stored procs seems unwise.If at all possible making this
> > > interface pluggable and provided via provider would be the saner approach
> > > in my opinion.
> > > 
> > > On Tue, Sep 23, 2025 at 11:16 AM [email protected] wrote:
> > > 
> > > > Hello,
> > > > 
> > > > A new approach utilizing stored SQL functions is proposed here as a
> > > > solution to unnecessary processing/starvation:
> > > > 
> > > > https://github.com/apache/airflow/pull/55537
> > > > 
> > > > Benchmarks show an actual improvement in queueing throughput, around
> > > > 1.5x-2x for the workloads tested.
> > > > 
> > > > Regarding DB server load, I wasn't able to note any difference so far, 
> > > > we
> > > > probably have to run heavier jobs to test that. Looks like a smooth 
> > > > line to
> > > > me.
> > > > 
> > > > ---------------------------------------------------------------------
> > > > To unsubscribe, e-mail:[email protected]
> > > > For additional commands, e-mail:[email protected]
> > > > ---------------------------------------------------------------------
> > > > To unsubscribe, e-mail:[email protected]
> > > > For additional commands, e-mail:[email protected]

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to