Please please please: Give us DB benchmark figures. It almost doesnNothing else 
matters if this performance hits DBs too hard.

What about Daniel’s idea in the slack about adding a “last_scheduling_decision” 
or similar to TaskInstance too, and order by that so that we don’t just 
repeatedly hit the same TI? On the surface, that seems like a much more 
maintainable solution with 90-99% of the same net effect (that TI's that can’t 
get queued don’t starve out everything else) without a complex query and 
possible DB load, not to mention possible big differences between mysql and 
Postgres. 

> On 8 Aug 2025, at 11:34, asquator <asqua...@proton.me.INVALID> wrote:
> 
> #53492 status update:
> 
> We optimized the query significantly by abandoning the nesting of window 
> functions and joining on TI.id in the outer query. It's still a heavier query 
> than we had before for fetching tasks (with lighter python work), so 
> benchmarking the DB is required, but it's somewhat difficult because there 
> don't seem to be unified, agreed upon workloads for benchmarking the 
> scheduler. Running dynamic benchmarks with multiple deployments as suggested 
> by @ashb is challenging too due to high resource requirements. I remember 
> there was an AIP-59 for similar cases, but I'm not sure if it's fit here. 
> We're open for any suggestions as to how to advance.
> 
> Regarding #54103, I think it's difficult to maintain and extend just because 
> it solves just one out of four possible starvation cases. If it's merged, the 
> TI per DAG issue is solved forever, but TI per pool starvation (issue #45636) 
> will still be present. What this PR does is computing a lateral join on DAG 
> runs and ensuring the query never fetches more TIs for a DAG run than it can 
> run. It's roughly equivalent to one of the window functions in #53492. If we 
> want to also solve pool starvation, we'll have to add another lateral join. 
> It's all the same except performance, but let's be creative - we can reduce 
> the performance overhead of #53492 to that  of #54103 by simply computing 
> only one window per scheduler iteration, and it can be done in a round-robin 
> fashion (for windows), such that every #concurrency_limits = 4 iterations we 
> break starvation for at least one of the limits. This way we have the same 
> performance but solve all the cases, at least once in 4 iterations. If we see 
> that windows don't cause a big overhead, we can run two of them in every 
> iteration. This can be another configuration called scheduler.optimism_level 
> that defines how many window functions we include, while handling other 
> limits optimistically. This requires lots of coding and testing, but the idea 
> is clear.
> 
> I say we should handle all the starvation issues in a comprehensive, 
> encompassing logic change for the scheduler.
> 
> 
> On Thursday, August 7th, 2025 at 4:43 AM, Christos Bisias 
> <christos...@gmail.com> wrote:
> 
>>> You're talking about https://github.com/apache/airflow/pull/53492/ right?
>> 
>> 
>> Yes.
>> 
>>> Where is the PR from @Christos?
>> 
>> 
>> https://github.com/apache/airflow/pull/54103
>> 
>> 
>> 
>> On Wed, Aug 6, 2025 at 23:51 Daniel Standish
>> daniel.stand...@astronomer.io.invalid wrote:
>> 
>>>> IMO, the approach on the patch isn't easily maintainable. Most of the
>>>> calculations are performed by SQL in a huge query.
>>>> It would be my preference to have many smaller queries and do part of the
>>>> calculations in python. This will be easier to understand, maintain and
>>>> debug in the future. Also, it will be easier to unit test.
>>> 
>>> You're talking about https://github.com/apache/airflow/pull/53492/ right?
>>> I agree. I share the skepticism that it must be one big ugly query. At a
>>> minimum it needs a lot more work and refinement. Not something that should
>>> be merged in the current state, even as experimental.
>>> 
>>> Where is the PR from @Christos?
>>> 
>>> On Wed, Aug 6, 2025 at 12:54 PM Jens Scheffler <j_scheff...@gmx.de.invalid
>>> 
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I was (until now) not be able to re-read all the Slack discussion and
>>>> like to make this latest at the weekend. I also like Jarek fear that the
>>>> optimization makes the Scheduler rather hard to maintain. We also had
>>>> some points where we_thought_ we can contribute some optimizations
>>>> especially for Mapped Tasks and then considered the complexity of Mapped
>>>> Task Groups where the Depth-First Strategy would defeat all our drafted
>>>> optimizations. So also in our current apporach we are cutting down the
>>>> Dags in manageable pieces.
>>>> 
>>>> So far (I believ, but anybody correct me if I am wrong) the scaling was
>>>> always documented only with options, no real upper boundary (other than
>>>> soft limtis) existing in the code. So the delivered product never
>>>> confirmed fixed upper limits. It might be good also to consider that we
>>>> document where we know there are natural or structural boundaries. So
>>>> hope that I can read more details the next days.
>>>> 
>>>> Jens
>>>> 
>>>> On 06.08.25 10:31, Jarek Potiuk wrote:
>>>> 
>>>>>> My main issue and the topic of this thread, has been that the
>>>>>> scheduler
>>>>>> does unnecessary work that leads to decreased throughput. My solution
>>>>>> has
>>>>>> been to limit the results of the query to the dag cap of active tasks
>>>>>> that
>>>>>> the user has defined.
>>>>> 
>>>>> Yes. I understand that. There are situations that cause this
>>>>> "unnecessary
>>>>> work" to be excessive and lead to lower performance and more memory
>>>>> usage.
>>>>> This is quite "normal". No system in the world is optimized for all
>>>>> kinds
>>>>> of scenarios and sometimes you need to make trade-offs - for example
>>>>> lower
>>>>> performance and maintainability (and support for MySQL and Postgres as
>>>>> Ash
>>>>> pointed out in some other threads) which we have to make. There are
>>>>> various
>>>>> optimisation goals we can chase: optimal performance and no wasted
>>>>> resources in certain situations and configurations is one of (many)
>>>>> goals
>>>>> we might have. Other goals might include: easier maintainability,
>>>>> better
>>>>> community collaboration, simplicity, less code to maintain,
>>>>> testability,
>>>>> also (what I mentioned before) sometimes deliberate not handling
>>>>> certain
>>>>> scenarios and introducing friction might be deliberate decision we
>>>>> can
>>>>> take in order to push our users in the direction we want them to go.
>>>>> Yes.
>>>>> As community and maintainers we do not have to always "follow" our
>>>>> users
>>>>> behaviour - we can (and we often do) educate our users and show them
>>>>> better
>>>>> ways of doing things.
>>>>> 
>>>>> For example we had a LONG discussion whether to introduce caching of
>>>>> Variable values during Dag parsing - because we knew our users are
>>>>> often
>>>>> using Variables in top-level code of their Dags and this leads to a lot
>>>>> of
>>>>> waste and high CPU and I/O usage by Dag processor. We finally
>>>>> implemented
>>>>> it as an experimental feature, but it was not at all certain we will -
>>>>> we
>>>>> had to carefully consider what we are trading in exchange for that
>>>>> performance - and whether it's worth it.
>>>>> 
>>>>> Same here - I understand: there are some cases (arguably rather niche -
>>>>> with very large Dags) where scheduler does unnecessary processing and
>>>>> performance could be improved. Now - we need to understand what
>>>>> trade-offs
>>>>> we need to make as maintainers and community (including our users) if
>>>>> we
>>>>> want to address it. We need to know what complexity is involved,
>>>>> whether
>>>>> it
>>>>> will work with Postgres/MySQL and SQlite, whether we will be able to
>>>>> continue debugging and testing it. And whether we want to drive away
>>>>> our
>>>>> user from the modularisation strategy (smaller Dags) that we think
>>>>> makes
>>>>> more sense than bigger Dags. We have to think about what happens next.
>>>>> If
>>>>> we make "huge Dags" first-class-citizens, will it mean that we will
>>>>> have
>>>>> to
>>>>> redesign our UI to support them? What should we do when someone opens
>>>>> up
>>>>> an
>>>>> issue "I have this 1000000 task Dag and I cannot open Airflow UI - it
>>>>> crashes hard and makes my Airflow instance unusable - please fix it
>>>>> ASAP".
>>>>> I certainly would like to avoid such a situation to stress our friend
>>>>> maintainers who work on UI - so also they should have a say on how
>>>>> feasible
>>>>> it is to make it "easy" to have "huge Dags" for them.
>>>>> 
>>>>> All those factors should be taken into account when you make a
>>>>> "product"
>>>>> decision. Performance gains for particular cases is just one of many
>>>>> factors to consider - and often not the most important ones.
>>>>> 
>>>>> J.
>>>>> 
>>>>> On Wed, Aug 6, 2025 at 7:34 AM Christos Bisias christos...@gmail.com
>>>>> wrote:
>>>>> 
>>>>>> We also have a dag with dynamic task mapping that can grow immensely.
>>>>>> 
>>>>>> I've been looking at https://github.com/apache/airflow/pull/53492.
>>>>>> 
>>>>>> My main issue and the topic of this thread, has been that the
>>>>>> scheduler
>>>>>> does unnecessary work that leads to decreased throughput. My solution
>>>>>> has
>>>>>> been to limit the results of the query to the dag cap of active tasks
>>>>>> that
>>>>>> the user has defined.
>>>>>> 
>>>>>> The patch is more focused on the available pool slots. I get the idea
>>>>>> that
>>>>>> if we can only examine and queue as many tasks as available slots,
>>>>>> then
>>>>>> we
>>>>>> will be efficiently utilizing the available slots to the max, the
>>>>>> throughput will increase and my issue will be solved as well.
>>>>>> 
>>>>>> IMO, the approach on the patch isn't easily maintainable. Most of the
>>>>>> calculations are performed by SQL in a huge query.
>>>>>> 
>>>>>> It would be my preference to have many smaller queries and do part of
>>>>>> the
>>>>>> calculations in python. This will be easier to understand, maintain
>>>>>> and
>>>>>> debug in the future. Also, it will be easier to unit test.
>>>>>> 
>>>>>> On Tue, Aug 5, 2025 at 10:20 PM Jarek Potiuk ja...@potiuk.com
>>>>>> wrote:
>>>>>> 
>>>>>>> Just a comment here - I am also not opposed as well if optimizations
>>>>>>> will
>>>>>>> be implemented without impacting the more "regular"cases. And -
>>>>>>> important -
>>>>>>> without adding huge complexity.
>>>>>>> 
>>>>>>> The SQL queries I saw in recent PRs and discussions look both "smart"
>>>>>>> and
>>>>>>> "scary" at the same time. Optimizations like that tend to lead to
>>>>>>> obfuscated, difficult to understand and reason code and "smart"
>>>>>>> solutions -
>>>>>>> sometimes "too smart". And when it ends up with one or two people
>>>>>>> only
>>>>>>> being able to debug and fix problems connected with those, things
>>>>>>> become
>>>>>>> a
>>>>>>> little hairy. So whatever we do there, it must be not only
>>>>>>> "smart"
>>>>>>> but
>>>>>>> also easy to read and well tested - so that anyone can run the tests
>>>>>>> easily
>>>>>>> and reproduce potential failure cases.
>>>>>>> 
>>>>>>> And yes I know I am writing this as someone who - for years was the
>>>>>>> only
>>>>>>> one to understand our complex CI setup. But I think over the last two
>>>>>>> years
>>>>>>> we are definitely going into, simpler, easier to understand setup and
>>>>>>> we
>>>>>>> have more people on board who know how to deal with it and I think
>>>>>>> that
>>>>>>> is
>>>>>>> a very good direction we are taking :). And I am sure that when I go
>>>>>>> for
>>>>>>> my
>>>>>>> planned 3 weeks holidays before the summit, everything will work as
>>>>>>> smoothly as when I am here - at least.
>>>>>>> 
>>>>>>> Also I think there is quite a difference (when it comes to
>>>>>>> scheduling)
>>>>>>> when
>>>>>>> you have mapped tasks versus "regular tasks". I think Airflow even
>>>>>>> currently behaves rather differently in those two different cases,
>>>>>>> and
>>>>>>> also
>>>>>>> it has a well-thought and optimized UI experience to handle thousands
>>>>>>> of
>>>>>>> them. Also the work of David Blain on Lazy Expandable Task Mapping
>>>>>>> will
>>>>>>> push the boundaries of what is possible there as well:
>>>>>>> https://github.com/apache/airflow/pull/51391. Even if we solve
>>>>>>> scheduling
>>>>>>> optimization - the UI and ability to monitor such huge Dags is still
>>>>>>> likely
>>>>>>> not something our UI was designed for.
>>>>>>> 
>>>>>>> And I am fully on board with "splitting to even smaller pieces" and
>>>>>>> "modularizing" things - and "modularizing and splitting big Dags into
>>>>>>> smaller Dags" feels like precisely what should be done. And I think
>>>>>>> it
>>>>>>> would be a nice idea to try it and follow and see if you can't
>>>>>>> achieve
>>>>>>> the
>>>>>>> same results without adding complexity.
>>>>>>> 
>>>>>>> J.
>>>>>>> 
>>>>>>> On Tue, Aug 5, 2025 at 8:47 PM Ash Berlin-Taylor a...@apache.org
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Yeah dynamic task mapping is a good case where you could easily end
>>>>>>>> up
>>>>>>>> with thousands of tasksof in a dag.
>>>>>>>> 
>>>>>>>> As I like to say, Airflow is a broad church and if we’re can
>>>>>>>> reasonably
>>>>>>>> support diverse workloads without impacting others (either the
>>>>>>>> workloads
>>>>>>>> out our available to support and maintain etc) then I’m all for it.
>>>>>>>> 
>>>>>>>> In addition to your two items I’d like to add
>>>>>>>> 
>>>>>>>> 3. That it doesn’t increase the db’s CPU disproportionally to the
>>>>>>>> increased task throughput
>>>>>>>> 
>>>>>>>>> On 5 Aug 2025, at 19:14, asquator asqua...@proton.me.invalid
>>>>>>>>> wrote:
>>>>>>>>> I'm glad this issue finally got enough attention and we can move
>>>>>>>>> it
>>>>>>>>> forward.
>>>>>>>>> I took a look at @Christos's patch and it makes sense overall, it's
>>>>>>>>> fine
>>>>>>>>> for the specific problem they experienced with max_active_tasks
>>>>>>>>> limit.
>>>>>>>>> For those unfamiliar with the core problem, the bug has a plenty of
>>>>>>>>> variations where starvation happens due to different concurrency
>>>>>>>>> limitations being nearly satiated, which creates the opportunity for
>>>>>>>>> the
>>>>>>>>> scheduler to pull many tasks and schedule none of them.
>>>>>>>>> To reproduce this bug, you need two conditions:
>>>>>>>>> 1. Many tasks (>> max_tis) belonging to one "pool", where "pool" is
>>>>>>>>> some
>>>>>>>>> concurrency limitation of Airflow. Note that originally the bug was
>>>>>>>>> discovered in context of task pools (see
>>>>>>>>> https://github.com/apache/airflow/issues/45636).
>>>>>>>>> 2. The tasks are short enough (or the parallelism is large enough)
>>>>>>>>> for
>>>>>>>>> the tasks from the nearly starved pool to free some slots in every
>>>>>>>>> scheduler's iteration.
>>>>>>>>> When we discovered a bug that starved our less prioritized pool,
>>>>>>>>> even
>>>>>>>>> when the most prioritized pool was almost full (thanks to
>>>>>>>>> @nevcohen),
>>>>>>>>> we
>>>>>>>>> wanted to implement a similar patch @Christos suggested above, but
>>>>>>>>> for
>>>>>>>>> pools. But then we realized this issue can arise due to limits
>>>>>>>>> different
>>>>>>>>> from task pools, including:
>>>>>>>>> max_active_tasks
>>>>>>>>> max_active_tis_per_dag
>>>>>>>>> max_active_tis_per_dagrun
>>>>>>>>> 
>>>>>>>>> So we were able to predict the forecoming bug reports for different
>>>>>>>>> kinds of starvation, and we started working on the most general
>>>>>>>>> solution
>>>>>>>>> which is the topic of this discussion.
>>>>>>>>> I want to also answer @potiuk regarding "why you need such large
>>>>>>>>> DAGs",
>>>>>>>>> but I will be brief.
>>>>>>>>> Airflow is an advanced tool for scheduling large data operations,
>>>>>>>>> and
>>>>>>>>> over the years it has pushed to production many features that lead
>>>>>>>>> to
>>>>>>>>> organizations writing DAGs that contain thousands of tasks. Most
>>>>>>>>> prominent
>>>>>>>>> one is dynamic task mapping. This feature made us realize we can
>>>>>>>>> implement
>>>>>>>>> a batching work queue pattern and create a task for every unit we
>>>>>>>>> have
>>>>>>>>> to
>>>>>>>>> process, say it's a file in a specific folder, a path in the
>>>>>>>>> filesystem,
>>>>>>>>> a
>>>>>>>>> pointer to some data stored in object storage, etc. We like to think
>>>>>>>>> in
>>>>>>>>> terms of splitting the work into many tasks. Is it good? I don't
>>>>>>>>> know,
>>>>>>>>> but
>>>>>>>>> Airflow has already stepped onto this path, and we have to make it
>>>>>>>>> technologically possible (if we can).
>>>>>>>>> Nevertheless, even if such DAGs are considered too big and
>>>>>>>>> splitting
>>>>>>>>> them is a good idea (though you still have nothing to do with mapped
>>>>>>>>> tasks
>>>>>>>>> - we create tens of thousands of them sometimes and expect them to
>>>>>>>>> be
>>>>>>>>> processed in parallel), this issue does not only address the
>>>>>>>>> described
>>>>>>>>> case, but many others, including prioritized pools, mapped tasks or
>>>>>>>>> max_active_runs starvation on large backfills.
>>>>>>>>> The only part that's missing now is measuring query time (static
>>>>>>>>> benchmarks) and measuring overall scheduling metrics in production
>>>>>>>>> workloads (dynamic benchmarks).
>>>>>>>>> We're working hard on this crucial part now.
>>>>>>>>> 
>>>>>>>>> We'd be happy to have any assistance from the community as regard
>>>>>>>>> to
>>>>>>>>> the
>>>>>>>>> dynamic benchmarks, because every workload is different and it's
>>>>>>>>> pretty
>>>>>>>>> difficult to simulate the general case in such a hard-to-reproduce
>>>>>>>>> issue.
>>>>>>>>> We have to make sure that:
>>>>>>>>> 1. In a busy workload, the new logic boosts the scheduler's
>>>>>>>>> throughput.
>>>>>>>>> 2. In a light workload, the nested windowing doesn't significantly
>>>>>>>>> slow
>>>>>>>>> down the computation.
>>>>>>>>> 
>>>>>>>>>> On Monday, August 4th, 2025 at 9:00 PM, Christos Bisias <
>>>>>>>>>> christos...@gmail.com> wrote:
>>>>>>>>>> I created a draft PR for anyone interested to take a look at the
>>>>>>>>>> code
>>>>>>>>>> https://github.com/apache/airflow/pull/54103
>>>>>>>>>> 
>>>>>>>>>> I was able to demonstrate the issue in the unit test with much
>>>>>>>>>> fewer
>>>>>>>>>> tasks.
>>>>>>>>>> All we need is the tasks brought back by the db query to belong to
>>>>>>>>>> the
>>>>>>>>>> same
>>>>>>>>>> dag_run or dag. This can happen when the first SCHEDULED tasks in
>>>>>>>>>> line
>>>>>>>>>> to
>>>>>>>>>> be examined are at least as many as the number of the tis per
>>>>>>>>>> query.
>>>>>>>>>> 
>>>>>>>>>> On Mon, Aug 4, 2025 at 8:37 PM Daniel Standish
>>>>>>>>>> daniel.stand...@astronomer.io.invalid wrote:
>>>>>>>>>> 
>>>>>>>>>>>> The configurability was my recommendation for
>>>>>>>>>>>> https://github.com/apache/airflow/pull/53492
>>>>>>>>>>>> Given the fact that this change is at the heart of Airflow I
>>>>>>>>>>>> think
>>>>>>>>>>>> the
>>>>>>>>>>>> changes should be experimental where users can switch between
>>>>>>>>>>>> different
>>>>>>>>>>>> strategies/modes of the scheduler.
>>>>>>>>>>>> If and when we have enough data to support that specific option
>>>>>>>>>>>> is
>>>>>>>>>>>> always
>>>>>>>>>>>> better we can make decisions accordingly.
>>>>>>>>>>>> Yeah I guess looking at #53492
>>>>>>>>>>>> https://github.com/apache/airflow/pull/53492 it does seem too
>>>>>>>>>>>> risky
>>>>>>>>>>>> to
>>>>>>>>>>>> just change the behavior in airflow without releasing it first as
>>>>>>>>>>>> experimental.
>>>>>>>>>>> 
>>>>>>>>>>> I doubt we can get sufficient real world testing without doing
>>>>>>>>>>> that.
>>>>>>>>>>> So if this is introduced, I think it should just be introduced as
>>>>>>>>>>> experimental optimization. And the intention would be that
>>>>>>>>>>> ultimately
>>>>>>>>>>> there will only be one scheduling mode, and this is just a way to
>>>>>>>>>>> test
>>>>>>>>>>> this
>>>>>>>>>>> out more widely. Not that we are intending to have two scheduling
>>>>>>>>>>> code
>>>>>>>>>>> paths on a permanent basis.
>>>>>>>>>>> 
>>>>>>>>>>> WDYT
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Aug 4, 2025 at 12:50 AM Christos Bisias
>>>>>>>>>>> christos...@gmail.com
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>>> So my question to you is: is it impossible, or just demanding
>>>>>>>>>>>>> or
>>>>>>>>>>>>> difficult
>>>>>>>>>>>>> to split your Dags into smaller dags connected with asset aware
>>>>>>>>>>>>> scheduling?
>>>>>>>>>>>>> Jarek, I'm going to discuss this with the team and I will get
>>>>>>>>>>>>> you
>>>>>>>>>>>>> an
>>>>>>>>>>>>> answer
>>>>>>>>>>>>> on that.
>>>>>>>>>>>> 
>>>>>>>>>>>> I've shared this again on the thread
>>> 
>>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
>>> 
>>>>>>>>>>>> I haven't created a PR because this is just a POC and it's also
>>>>>>>>>>>> setting a
>>>>>>>>>>>> limit per dag. I would like to get feedback on whether it's
>>>>>>>>>>>> better
>>>>>>>>>>>> to
>>>>>>>>>>>> make
>>>>>>>>>>>> it per dag or per dag_run.
>>>>>>>>>>>> I can create a draft PR if that's helpful and makes it easier to
>>>>>>>>>>>> add
>>>>>>>>>>>> comments.
>>>>>>>>>>>> 
>>>>>>>>>>>> Let me try to explain the issue better. From a high level
>>>>>>>>>>>> overview,
>>>>>>>>>>>> the
>>>>>>>>>>>> scheduler
>>>>>>>>>>>> 
>>>>>>>>>>>> 1. moves tasks to SCHEDULED
>>>>>>>>>>>> 2. runs a query to fetch SCHEDULED tasks from the db
>>>>>>>>>>>> 3. examines the tasks
>>>>>>>>>>>> 4. moves tasks to QUEUED
>>>>>>>>>>>> 
>>>>>>>>>>>> I'm focusing on step 2 and afterwards. The current code doesn't
>>>>>>>>>>>> take
>>>>>>>>>>>> into
>>>>>>>>>>>> account the max_active_tasks_per_dag. When it runs the query it
>>>>>>>>>>>> fetches
>>>>>>>>>>>> up to max_tis which is determined here
>>>>>>>>>>>> <
>>> 
>>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L697-L705
>>> 
>>>>>>>>>>>> .
>>>>>>>>>>>> 
>>>>>>>>>>>> For example,
>>>>>>>>>>>> 
>>>>>>>>>>>> - if the query number is 32
>>>>>>>>>>>> - all 32 tasks in line belong to the same dag, dag1
>>>>>>>>>>>> - we are not concerned how the scheduler picks them
>>>>>>>>>>>> - dag1 has max_active_tasks set to 5
>>>>>>>>>>>> 
>>>>>>>>>>>> The current code will
>>>>>>>>>>>> 
>>>>>>>>>>>> - get 32 tasks from dag1
>>>>>>>>>>>> - start examining them one by one
>>>>>>>>>>>> - once 5 are moved to QUEUED, it won't stop, it will keep
>>>>>>>>>>>> examining
>>>>>>>>>>>> the other 27 but won't be able to queue them because it has
>>>>>>>>>>>> reached
>>>>>>>>>>>> the
>>>>>>>>>>>> limit
>>>>>>>>>>>> 
>>>>>>>>>>>> In the next loop, although we have reached the maximum number of
>>>>>>>>>>>> tasks
>>>>>>>>>>>> for
>>>>>>>>>>>> dag1, the query will fetch again 32 tasks from dag1 to examine
>>>>>>>>>>>> them
>>>>>>>>>>>> and
>>>>>>>>>>>> try to queue them.
>>>>>>>>>>>> 
>>>>>>>>>>>> The issue is that it gets more tasks than it can queue from the
>>>>>>>>>>>> db
>>>>>>>>>>>> and
>>>>>>>>>>>> then
>>>>>>>>>>>> examines them all.
>>>>>>>>>>>> 
>>>>>>>>>>>> This all leads to unnecessary processing that builds up and the
>>>>>>>>>>>> more
>>>>>>>>>>>> load
>>>>>>>>>>>> there is on the system, the more the throughput drops for the
>>>>>>>>>>>> scheduler
>>>>>>>>>>>> and
>>>>>>>>>>>> the workers.
>>>>>>>>>>>> 
>>>>>>>>>>>> What I'm proposing is to adjust the query in step 2, to check
>>>>>>>>>>>> the
>>>>>>>>>>>> max_active_tasks_per_dag
>>>>>>>>>>>> 
>>>>>>>>>>>>> run a query to fetch SCHEDULED tasks from the db
>>>>>>>>>>>>> If a dag has already reached the maximum number of tasks in
>>>>>>>>>>>>> active
>>>>>>>>>>>>> states,
>>>>>>>>>>>>> it will be skipped by the query.
>>>>>>>>>>>> 
>>>>>>>>>>>> Don't we already stop examining at that point? I guess there's
>>>>>>>>>>>> two
>>>>>>>>>>>> things
>>>>>>>>>>>> 
>>>>>>>>>>>>> you might be referring to. One is, which TIs come out of the db
>>>>>>>>>>>>> and
>>>>>>>>>>>>> into
>>>>>>>>>>>>> python, and the other is, what we do in python. Just might be
>>>>>>>>>>>>> helpful
>>>>>>>>>>>>> to
>>>>>>>>>>>>> be clear about the specific enhancements & changes you are
>>>>>>>>>>>>> making.
>>>>>>>>>>>>> I think that if we adjust the query and fetch the right number
>>>>>>>>>>>>> of
>>>>>>>>>>>>> tasks,
>>>>>>>>>>>>> then we won't have to make changes to what is done in python.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Mon, Aug 4, 2025 at 8:01 AM Daniel Standish
>>>>>>>>>>>> daniel.stand...@astronomer.io.invalid wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> @Christos Bisias
>>>>>>>>>>>>> 
>>>>>>>>>>>>> If you have a very large dag, and its tasks have been
>>>>>>>>>>>>> scheduled,
>>>>>>>>>>>>> then
>>>>>>>>>>>>> the
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> scheduler will keep examining the tasks for queueing, even if
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>> has
>>>>>>>>>>>>>> reached the maximum number of active tasks for that particular
>>>>>>>>>>>>>> dag.
>>>>>>>>>>>>>> Once
>>>>>>>>>>>>>> that fails, then it will move on to examine the scheduled
>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> next
>>>>>>>>>>>>>> dag or dag_run in line.
>>>>>>>>>>>>>> Can you make this a little more precise? There's some
>>>>>>>>>>>>>> protection
>>>>>>>>>>>>>> against
>>>>>>>>>>>>>> "starvation" i.e. dag runs recently considered should go to the
>>>>>>>>>>>>>> back
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> line next time.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Maybe you could clarify why / how that's not working / not
>>>>>>>>>>>>> optimal
>>>>>>>>>>>>> /
>>>>>>>>>>>>> how
>>>>>>>>>>>>> to
>>>>>>>>>>>>> improve.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> If there are available slots in the pool and
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the max parallelism hasn't been reached yet, then the
>>>>>>>>>>>>>> scheduler
>>>>>>>>>>>>>> should
>>>>>>>>>>>>>> stop
>>>>>>>>>>>>>> processing a dag that has already reached its max capacity of
>>>>>>>>>>>>>> active
>>>>>>>>>>>>>> tasks.
>>>>>>>>>>>>>> If a dag run (or dag) is already at max capacity, it doesn't
>>>>>>>>>>>>>> really
>>>>>>>>>>>>>> matter
>>>>>>>>>>>>>> if there are slots available or parallelism isn't reached --
>>>>>>>>>>>>>> shouldn't
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>> stop anyway?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> In addition, the number of scheduled tasks picked for
>>>>>>>>>>>>> examining,
>>>>>>>>>>>>> should
>>>>>>>>>>>>> be
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> capped at the number of max active tasks if that's lower than
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> query
>>>>>>>>>>>>>> limit. If the active limit is 10 and we already have 5
>>>>>>>>>>>>>> running,
>>>>>>>>>>>>>> then
>>>>>>>>>>>>>> we
>>>>>>>>>>>>>> can
>>>>>>>>>>>>>> queue at most 5 tasks. In that case, we shouldn't examine more
>>>>>>>>>>>>>> than
>>>>>>>>>>>>>> that.
>>>>>>>>>>>>>> Don't we already stop examining at that point? I guess there's
>>>>>>>>>>>>>> two
>>>>>>>>>>>>>> things
>>>>>>>>>>>>>> you might be referring to. One is, which TIs come out of the db
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>> into
>>>>>>>>>>>>>> python, and the other is, what we do in python. Just might be
>>>>>>>>>>>>>> helpful
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> be clear about the specific enhancements & changes you are
>>>>>>>>>>>>>> making.
>>>>>>>>>>>>>> There is already a patch with the changes mentioned above. IMO,
>>>>>>>>>>>>>> these
>>>>>>>>>>>>>> changes should be enabled/disabled with a config flag and not
>>>>>>>>>>>>>> by
>>>>>>>>>>>>>> default
>>>>>>>>>>>>>> because not everyone has the same needs as us. In our testing,
>>>>>>>>>>>>>> adding a
>>>>>>>>>>>>>> limit on the tasks retrieved from the db requires more
>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>> on
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> query which actually makes things worse when you have multiple
>>>>>>>>>>>>>> small
>>>>>>>>>>>>>> dags.
>>>>>>>>>>>>>> I would like to see a stronger case made for configurability.
>>>>>>>>>>>>>> Why
>>>>>>>>>>>>>> make
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>> configurable? If the performance is always better, it should
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>> made
>>>>>>>>>>>>>> configurable. Unless it's merely released as an opt-in
>>>>>>>>>>>>>> experimental
>>>>>>>>>>>>>> feature. If it is worse in some profiles, let's be clear about
>>>>>>>>>>>>>> that.
>>>>>>>>>>>>>> I did not read anything after `Here is a simple test case that 
>>>>>>>>>>>>>> makes the benefits of the improvements noticeable` because, it 
>>>>>>>>>>>>>> seemed
>>>>>>>>>>>>>> rather
>>>>>>>>>>>>>> long
>>>>>>>>>>>>>> winded detail about a test
>>>>>>>>>>>>>> case. A higher level summary might be helpful to your audience.
>>>>>>>>>>>>>> Is
>>>>>>>>>>>>>> there
>>>>>>>>>>>>>> a PR with your optimization. You wrote "there is a patch" but
>>>>>>>>>>>>>> did
>>>>>>>>>>>>>> not,
>>>>>>>>>>>>>> unless I miss something, share it. I would take a look if you
>>>>>>>>>>>>>> share
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>> though.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 5:08 PM Daniel Standish <
>>>>>>>>>>>>> daniel.stand...@astronomer.io> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Yes Ui is another part of this.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> At some point the grid and graph views completely stop making
>>>>>>>>>>>>>> sense
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>> that volume, and another type of view would be required both
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>> usability
>>>>>>>>>>>>>> and performance
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 11:04 AM Jens Scheffler
>>>>>>>>>>>>>> j_scheff...@gmx.de.invalid
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> We also have a current demand to have a workflow to execute
>>>>>>>>>>>>>>> 10k
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> 100k
>>>>>>>>>>>>>>> tasks. Together with @AutomationDev85 we are working on a
>>>>>>>>>>>>>>> local
>>>>>>>>>>>>>>> solution
>>>>>>>>>>>>>>> because we also saw problems in the Scheduler that are not
>>>>>>>>>>>>>>> linearly
>>>>>>>>>>>>>>> scaling. And for sure not easy to be fixed. But from our
>>>>>>>>>>>>>>> investigation
>>>>>>>>>>>>>>> also there are other problems to be considered like UI will
>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>> potentially have problems.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I am a bit sceptic that PR 49160 completely fixes the
>>>>>>>>>>>>>>> problems
>>>>>>>>>>>>>>> mentioned
>>>>>>>>>>>>>>> here and made some comments. I do not want to stop enthusiasm
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> fix
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> improve things but the Scheduler is quite complex and changed
>>>>>>>>>>>>>>> need
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> made with care.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Actually I like the patch
>>> 
>>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
>>> 
>>>>>>>>>>>>>>> as it just adds some limit preventing scheduler to focus on
>>>>>>>>>>>>>>> only
>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>> run. But complexity is a bit big for a "patch" :-D
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> I'd also propose atm the way that Jarek described and
>>>>>>>>>>>>>>> split-up
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> Dag
>>>>>>>>>>>>>>> into multiple parts (divide and conquer) for the moment.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Otherwise if there is a concrete demand on such large Dags...
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> maybe
>>>>>>>>>>>>>>> need rather a broader initiative if we want to ensure 10k,
>>>>>>>>>>>>>>> 100k,
>>>>>>>>>>>>>>> 1M?
>>>>>>>>>>>>>>> tasks are supported per Dag. Because depending on the
>>>>>>>>>>>>>>> magnitude
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>> strive for different approaches are needed.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Jens
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 03.08.25 16:33, Daniel Standish wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Definitely an area of the scheduler with some opportunity
>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> performance
>>>>>>>>>>>>>>>> improvement.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I would just mention that, you should also attempt to
>>>>>>>>>>>>>>>> include
>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>> performance testing at load / scale because, window
>>>>>>>>>>>>>>>> functions
>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>> going
>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>> be more expensive.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> What happens when you have many dags, many historical dag
>>>>>>>>>>>>>>>> runs &
>>>>>>>>>>>>>>>> TIs,
>>>>>>>>>>>>>>>> lots
>>>>>>>>>>>>>>>> of stuff running concurrently. You need to be mindful of the
>>>>>>>>>>>>>>>> overall
>>>>>>>>>>>>>>>> impact of such a change, and not look only at the time spent
>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>> scheduling
>>>>>>>>>>>>>>>> this particular dag.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> I did not look at the PRs yet, maybe you've covered this,
>>>>>>>>>>>>>>>> but,
>>>>>>>>>>>>>>>> it's
>>>>>>>>>>>>>>>> important.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias<
>>>>>>>>>>>>>>>> christos...@gmail.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I'm going to review the PR code and test it more thoroughly
>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>> leaving
>>>>>>>>>>>>>>>>> a comment.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> This is my code for reference
>>> 
>>> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
>>> 
>>>>>>>>>>>>>>>>> The current version is setting a limit per dag, across all
>>>>>>>>>>>>>>>>> dag_runs.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Please correct me if I'm wrong, but the PR looks like it's
>>>>>>>>>>>>>>>>> changing
>>>>>>>>>>>>>>>>> the way
>>>>>>>>>>>>>>>>> that tasks are prioritized to avoid starvation. If that's
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> case,
>>>>>>>>>>>>>>>>> I'm not
>>>>>>>>>>>>>>>>> sure that this is the same issue. My proposal is that, if
>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>> reached
>>>>>>>>>>>>>>>>> the max resources assigned to a dag, then stop processing
>>>>>>>>>>>>>>>>> its
>>>>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> move on to the next one. I'm not changing how or which
>>>>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>> picked.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 3:23 PM asquator<asqua...@proton.me
>>>>>>>>>>>>>>>>> .invalid>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> Thank you for the feedback.
>>>>>>>>>>>>>>>>>> Please, describe the case with failing limit checks in the
>>>>>>>>>>>>>>>>>> PR
>>>>>>>>>>>>>>>>>> (DAG's
>>>>>>>>>>>>>>>>>> parameters and it's tasks' parameters and what fails to be
>>>>>>>>>>>>>>>>>> checked)
>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> we'll try to fix it ASAP before you can test it again.
>>>>>>>>>>>>>>>>>> Let's
>>>>>>>>>>>>>>>>>> continue
>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>> PR-related discussion in the PR itself.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias <
>>>>>>>>>>>>>>>>>> christos...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thank you for bringing this PR to my attention.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I haven't studied the code but I ran a quick test on the
>>>>>>>>>>>>>>>>>>> branch
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> completely ignores the limit on scheduled tasks per dag
>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>> dag_run.
>>>>>>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>> grabbed 70 tasks from the first dag and then moved all 70
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> QUEUED
>>>>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>> any further checks.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> This is how I tested it
>>> 
>>> https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1
>>> 
>>>>>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 1:44 PM
>>>>>>>>>>>>>>>>>>> asquatorasqua...@proton.me
>>>>>>>>>>>>>>>>>>> .invalid
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> This is a known issue stemming from the optimistic
>>>>>>>>>>>>>>>>>>>> scheduling
>>>>>>>>>>>>>>>>>>>> strategy
>>>>>>>>>>>>>>>>>>>> used in Airflow. We do address this in the
>>>>>>>>>>>>>>>>>>>> above-mentioned
>>>>>>>>>>>>>>>>>>>> PR. I
>>>>>>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> note that there are many cases where this problem may
>>>>>>>>>>>>>>>>>>>> appear—it
>>>>>>>>>>>>>>>>>>>> was
>>>>>>>>>>>>>>>>>>>> originally detected with pools, but we are striving to
>>>>>>>>>>>>>>>>>>>> fix
>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>>> cases,
>>>>>>>>>>>>>>>>>>>> such as the one described here with
>>>>>>>>>>>>>>>>>>>> max_active_tis_per_dag,
>>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>> switching to
>>>>>>>>>>>>>>>>>>>> pessimistic scheduling with SQL window functions. While
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>> strategy simply pulls the max_tis tasks and drops the
>>>>>>>>>>>>>>>>>>>> ones
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> do
>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>> meet
>>>>>>>>>>>>>>>>>>>> the constraints, the new strategy will pull only the
>>>>>>>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>> actually ready to be scheduled and that comply with all
>>>>>>>>>>>>>>>>>>>> concurrency
>>>>>>>>>>>>>>>>>>>> limits.
>>>>>>>>>>>>>>>>>>>> It would be very helpful for pushing this change to
>>>>>>>>>>>>>>>>>>>> production
>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>> could assist us in alpha-testing it.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> See also:
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/airflow/discussions/49160
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Sent with Proton Mail secure email.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif
>>>>>>>>>>>>>>>>>>>> elad...@apache.org
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> i think most of your issues will be addressed by
>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/airflow/pull/53492
>>>>>>>>>>>>>>>>>>>>> The PR code can be tested with Breeze so you can set it
>>>>>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>>>>>>> if it
>>>>>>>>>>>>>>>>>>>>> solves the problem this will also help with confirming
>>>>>>>>>>>>>>>>>>>>> it's
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> right
>>>>>>>>>>>>>>>>>>>>> fix.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias
>>>>>>>>>>>>>>>>>>>>> christos...@gmail.com
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> The scheduler is very efficient when running a large
>>>>>>>>>>>>>>>>>>>>>> amount
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> dags
>>>>>>>>>>>>>>>>>>>>>> with up
>>>>>>>>>>>>>>>>>>>>>> to 1000 tasks each. But in our case, we have dags with
>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>> many
>>>>>>>>>>>>>>>>>>>>>> as
>>>>>>>>>>>>>>>>>>>>>> 10.000
>>>>>>>>>>>>>>>>>>>>>> tasks. And in that scenario the scheduler and worker
>>>>>>>>>>>>>>>>>>>>>> throughput
>>>>>>>>>>>>>>>>>>>>>> drops
>>>>>>>>>>>>>>>>>>>>>> significantly. Even if you have 1 such large dag with
>>>>>>>>>>>>>>>>>>>>>> scheduled
>>>>>>>>>>>>>>>>>>>>>> tasks,
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> performance hit becomes noticeable.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> We did some digging and we found that the issue comes
>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> scheduler's
>>>>>>>>>>>>>>>>>>>>>> _executable_task_instances_to_queued
>>>>>>>>>>>>>>>>>>>>>> <
>>> 
>>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647
>>> 
>>>>>>>>>>>>>>>>>>>>>> method.
>>>>>>>>>>>>>>>>>>>>>> In particular with the db query here
>>>>>>>>>>>>>>>>>>>>>> <
>>> 
>>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375
>>> 
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> examining the results here
>>>>>>>>>>>>>>>>>>>>>> <
>>> 
>>> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425
>>> 
>>>>>>>>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> If you have a very large dag, and its tasks have been
>>>>>>>>>>>>>>>>>>>>>> scheduled,
>>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> scheduler will keep examining the tasks for queueing,
>>>>>>>>>>>>>>>>>>>>>> even
>>>>>>>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>>>>> reached the maximum number of active tasks for that
>>>>>>>>>>>>>>>>>>>>>> particular
>>>>>>>>>>>>>>>>>>>>>> dag.
>>>>>>>>>>>>>>>>>>>>>> Once
>>>>>>>>>>>>>>>>>>>>>> that fails, then it will move on to examine the
>>>>>>>>>>>>>>>>>>>>>> scheduled
>>>>>>>>>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> next
>>>>>>>>>>>>>>>>>>>>>> dag or dag_run in line.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> This is inefficient and causes the throughput of the
>>>>>>>>>>>>>>>>>>>>>> scheduler
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> workers to drop significantly. If there are available
>>>>>>>>>>>>>>>>>>>>>> slots
>>>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> pool and
>>>>>>>>>>>>>>>>>>>>>> the max parallelism hasn't been reached yet, then the
>>>>>>>>>>>>>>>>>>>>>> scheduler
>>>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>> stop
>>>>>>>>>>>>>>>>>>>>>> processing a dag that has already reached its max
>>>>>>>>>>>>>>>>>>>>>> capacity
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> active
>>>>>>>>>>>>>>>>>>>>>> tasks.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> In addition, the number of scheduled tasks picked for
>>>>>>>>>>>>>>>>>>>>>> examining,
>>>>>>>>>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>>>>>>>>>> capped at the number of max active tasks if that's
>>>>>>>>>>>>>>>>>>>>>> lower
>>>>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> query
>>>>>>>>>>>>>>>>>>>>>> limit. If the active limit is 10 and we already have 5
>>>>>>>>>>>>>>>>>>>>>> running,
>>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>> we can
>>>>>>>>>>>>>>>>>>>>>> queue at most 5 tasks. In that case, we shouldn't
>>>>>>>>>>>>>>>>>>>>>> examine
>>>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>> than
>>>>>>>>>>>>>>>>>>>>>> that.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> There is already a patch with the changes mentioned
>>>>>>>>>>>>>>>>>>>>>> above.
>>>>>>>>>>>>>>>>>>>>>> IMO,
>>>>>>>>>>>>>>>>>>>>>> these
>>>>>>>>>>>>>>>>>>>>>> changes should be enabled/disabled with a config flag
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>> by
>>>>>>>>>>>>>>>>>>>>>> default
>>>>>>>>>>>>>>>>>>>>>> because not everyone has the same needs as us. In our
>>>>>>>>>>>>>>>>>>>>>> testing,
>>>>>>>>>>>>>>>>>>>>>> adding a
>>>>>>>>>>>>>>>>>>>>>> limit on the tasks retrieved from the db requires more
>>>>>>>>>>>>>>>>>>>>>> processing
>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> query which actually makes things worse when you have
>>>>>>>>>>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>>>>>> small
>>>>>>>>>>>>>>>>>>>>>> dags.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Here is a simple test case that makes the benefits of
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> improvements
>>>>>>>>>>>>>>>>>>>>>> noticeable
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> - we have 3 dags with thousands of tasks each
>>>>>>>>>>>>>>>>>>>>>> - for simplicity let's have 1 dag_run per dag
>>>>>>>>>>>>>>>>>>>>>> - triggering them takes some time and due to that, the
>>>>>>>>>>>>>>>>>>>>>> FIFO
>>>>>>>>>>>>>>>>>>>>>> order
>>>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> tasks is very clear
>>>>>>>>>>>>>>>>>>>>>> - e.g. 1000 tasks from dag1 were scheduled first and
>>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>> 200
>>>>>>>>>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>>>>>>>>>> from dag2 etc.
>>>>>>>>>>>>>>>>>>>>>> - the executor has parallelism=100 and
>>>>>>>>>>>>>>>>>>>>>> slots_available=100
>>>>>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>>> means
>>>>>>>>>>>>>>>>>>>>>> that it can run up to 100 tasks concurrently
>>>>>>>>>>>>>>>>>>>>>> - max_active_tasks_per_dag is 4 which means that we
>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> 4
>>>>>>>>>>>>>>>>>>>>>> tasks running per dag.
>>>>>>>>>>>>>>>>>>>>>> - For 3 dags, it means that we can run up to 12 tasks
>>>>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>>>>> time (4 tasks from each dag)
>>>>>>>>>>>>>>>>>>>>>> - max tis per query are set to 32, meaning that we can
>>>>>>>>>>>>>>>>>>>>>> examine
>>>>>>>>>>>>>>>>>>>>>> up
>>>>>>>>>>>>>>>>>>>>>> to 32
>>>>>>>>>>>>>>>>>>>>>> scheduled tasks if there are available pool slots
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> If we were to run the scheduler loop repeatedly until
>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>> queues
>>>>>>>>>>>>>>>>>>>>>> 12
>>>>>>>>>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>>>>>>>>>> and test the part that examines the scheduled tasks
>>>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> queues
>>>>>>>>>>>>>>>>>>>>>> them,
>>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> - with the query limit
>>>>>>>>>>>>>>>>>>>>>> - 1 iteration, total time 0.05
>>>>>>>>>>>>>>>>>>>>>> - During the iteration
>>>>>>>>>>>>>>>>>>>>>> - we have parallelism 100, available slots 100 and
>>>>>>>>>>>>>>>>>>>>>> query
>>>>>>>>>>>>>>>>>>>>>> limit
>>>>>>>>>>>>>>>>>>>>>> 32
>>>>>>>>>>>>>>>>>>>>>> which means that it will examine up to 32 scheduled
>>>>>>>>>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>>>>>>>>>> - it can queue up to 100 tasks
>>>>>>>>>>>>>>>>>>>>>> - examines 12 tasks (instead of 32)
>>>>>>>>>>>>>>>>>>>>>> - 4 tasks from dag1, reached max for the dag
>>>>>>>>>>>>>>>>>>>>>> - 4 tasks from dag2, reached max for the dag
>>>>>>>>>>>>>>>>>>>>>> - and 4 tasks from dag3, reached max for the dag
>>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag1, reaches max for the dag and
>>>>>>>>>>>>>>>>>>>>>> moves
>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag2, reaches max for the dag and
>>>>>>>>>>>>>>>>>>>>>> moves
>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag3, reaches max for the dag and
>>>>>>>>>>>>>>>>>>>>>> moves
>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> - stops queueing because we have reached the maximum
>>>>>>>>>>>>>>>>>>>>>> per
>>>>>>>>>>>>>>>>>>>>>> dag,
>>>>>>>>>>>>>>>>>>>>>> although there are slots for more tasks
>>>>>>>>>>>>>>>>>>>>>> - iteration finishes
>>>>>>>>>>>>>>>>>>>>>> - without
>>>>>>>>>>>>>>>>>>>>>> - 3 iterations, total time 0.29
>>>>>>>>>>>>>>>>>>>>>> - During iteration 1
>>>>>>>>>>>>>>>>>>>>>> - Examines 32 tasks, all from dag1 (due to FIFO)
>>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag1 and tries to queue the other 28
>>>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>> fails
>>>>>>>>>>>>>>>>>>>>>> - During iteration 2
>>>>>>>>>>>>>>>>>>>>>> - examines the next 32 tasks from dag1
>>>>>>>>>>>>>>>>>>>>>> - it can't queue any of them because it has reached
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> max
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> dag1, since the previous 4 are still running
>>>>>>>>>>>>>>>>>>>>>> - examines 32 tasks from dag2
>>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag2 and tries to queue the other 28
>>>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>> fails
>>>>>>>>>>>>>>>>>>>>>> - During iteration 3
>>>>>>>>>>>>>>>>>>>>>> - examines the next 32 tasks from dag1, same tasks
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> were
>>>>>>>>>>>>>>>>>>>>>> examined in iteration 2
>>>>>>>>>>>>>>>>>>>>>> - it can't queue any of them because it has reached
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> max
>>>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> dag1 and the first 4 are still running
>>>>>>>>>>>>>>>>>>>>>> - examines 32 tasks from dag2 , can't queue any of
>>>>>>>>>>>>>>>>>>>>>> them
>>>>>>>>>>>>>>>>>>>>>> because
>>>>>>>>>>>>>>>>>>>>>> it has reached max for dag2 as well
>>>>>>>>>>>>>>>>>>>>>> - examines 32 tasks from dag3
>>>>>>>>>>>>>>>>>>>>>> - queues 4 from dag3 and tries to queue the other 28
>>>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>> fails
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I used very low values for all the configs so that I
>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> point
>>>>>>>>>>>>>>>>>>>>>> clear and easy to understand. If we increase them,
>>>>>>>>>>>>>>>>>>>>>> then
>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>> patch
>>>>>>>>>>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>>>>> makes the task selection more fair and the resource
>>>>>>>>>>>>>>>>>>>>>> distribution
>>>>>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>> even.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I would appreciate it if anyone familiar with the
>>>>>>>>>>>>>>>>>>>>>> scheduler's
>>>>>>>>>>>>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>> confirm this and also provide any feedback.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Additionally, I have one question regarding the query
>>>>>>>>>>>>>>>>>>>>>> limit.
>>>>>>>>>>>>>>>>>>>>>> Should it
>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>> per dag_run or per dag? I've noticed that
>>>>>>>>>>>>>>>>>>>>>> max_active_tasks_per_dag
>>>>>>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>>>>> been changed to provide a value per dag_run but the
>>>>>>>>>>>>>>>>>>>>>> docs
>>>>>>>>>>>>>>>>>>>>>> haven't
>>>>>>>>>>>>>>>>>>>>>> been
>>>>>>>>>>>>>>>>>>>>>> updated.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Thank you!
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>> Christos Bisias
>>>>>> 
>>>>>> ---------------------------------------------------------------------
>>>>>> 
>>>>>>>>>>>>>>>>>>>> To unsubscribe,
>>>>>>>>>>>>>>>>>>>> e-mail:dev-unsubscr...@airflow.apache.org
>>>>>>>>>>>>>>>>>>>> For additional commands,
>>>>>>>>>>>>>>>>>>>> e-mail:dev-h...@airflow.apache.org
>>>>>>> 
>>>>>>> ---------------------------------------------------------------------
>>>>>>> 
>>>>>>>>>>>>>>>>>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
>>>>>>>>>>>>>>>>>> For additional commands,
>>>>>>>>>>>>>>>>>> e-mail:dev-h...@airflow.apache.org
>>> 
>>> ---------------------------------------------------------------------
>>> 
>>>>>>>>> To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
>>>>>>>>> For additional commands, e-mail: dev-h...@airflow.apache.org
>>> 
>>> ---------------------------------------------------------------------
>>> 
>>>>>>>> To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
>>>>>>>> For additional commands, e-mail: dev-h...@airflow.apache.org
>>>> 
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
>>>> For additional commands, e-mail: dev-h...@airflow.apache.org
> 
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> For additional commands, e-mail: dev-h...@airflow.apache.org
> 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
For additional commands, e-mail: dev-h...@airflow.apache.org

Reply via email to