>
> The configurability was my recommendation for
> https://github.com/apache/airflow/pull/53492
> Given the fact that this change is at the heart of Airflow I think the
> changes should be experimental where users can switch between different
> strategies/modes of the scheduler.
> If and when we have enough data to support that specific option is always
> better we can make decisions accordingly.

Yeah I guess looking at #53492
<https://github.com/apache/airflow/pull/53492> it does seem too risky to
just change the behavior in airflow without releasing it first as
experimental.

I doubt we can get sufficient real world testing without doing that.

So if this is introduced, I think it should just be introduced as
experimental optimization.  And the intention would be that ultimately
there will only be one scheduling mode, and this is just a way to test this
out more widely.  Not that we are intending to have two scheduling code
paths on a permanent basis.

WDYT




On Mon, Aug 4, 2025 at 12:50 AM Christos Bisias <christos...@gmail.com>
wrote:

> >
> > So my question to you is: is it impossible, or just demanding or
> difficult
> > to split your Dags into smaller dags connected with asset aware
> scheduling?
>
>
> Jarek, I'm going to discuss this with the team and I will get you an answer
> on that.
>
> I've shared this again on the thread
>
>
> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
>
> I haven't created a PR because this is just a POC and it's also setting a
> limit per dag. I would like to get feedback on whether it's better to make
> it per dag or per dag_run.
> I can create a draft PR if that's helpful and makes it easier to add
> comments.
>
> Let me try to explain the issue better. From a high level overview, the
> scheduler
>
>    1. moves tasks to SCHEDULED
>    2. runs a query to fetch SCHEDULED tasks from the db
>    3. examines the tasks
>    4. moves tasks to QUEUED
>
> I'm focusing on step 2 and afterwards. The current code doesn't take into
> account the *max_active_tasks_per_dag*. When it runs the query it fetches
> up to *max_tis* which is determined here
> <
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L697-L705
> >
> .
>
> For example,
>
>    - if the query number is 32
>    - all 32 tasks in line belong to the same dag, *dag1*
>       - we are not concerned how the scheduler picks them
>    - *dag1* has *max_active_tasks* set to 5
>
> The current code will
>
>    - get 32 tasks from *dag1*
>    - start examining them one by one
>    - once 5 are moved to *QUEUED*, it won't stop, it will keep examining
>    the other 27 but won't be able to queue them because it has reached the
>    limit
>
> In the next loop, although we have reached the maximum number of tasks for
> *dag1*, the query will fetch again 32 tasks from *dag1* to examine them and
> try to queue them.
>
> The issue is that it gets more tasks than it can queue from the db and then
> examines them all.
>
> This all leads to unnecessary processing that builds up and the more load
> there is on the system, the more the throughput drops for the scheduler and
> the workers.
>
> What I'm proposing is to adjust the query in step 2, to check the
> *max_active_tasks_per_dag*
>
> >  run a query to fetch SCHEDULED tasks from the db
>
>
> If a dag has already reached the maximum number of tasks in active states,
> it will be skipped by the query.
>
> Don't we already stop examining at that point?   I guess there's two things
> > you might be referring to.  One is, which TIs come out of the db and into
> > python, and the other is, what we do in python.  Just might be helpful to
> > be clear about the specific enhancements & changes you are making.
>
>
> I think that if we adjust the query and fetch the right number of tasks,
> then we won't have to make changes to what is done in python.
>
>
>
>
> On Mon, Aug 4, 2025 at 8:01 AM Daniel Standish
> <daniel.stand...@astronomer.io.invalid> wrote:
>
> > @Christos Bisias
> >
> > If you have a very large dag, and its tasks have been scheduled, then the
> > > scheduler will keep examining the tasks for queueing, even if it has
> > > reached the maximum number of active tasks for that particular dag.
> Once
> > > that fails, then it will move on to examine the scheduled tasks of the
> > next
> > > dag or dag_run in line.
> >
> >
> > Can you make this a little more precise?  There's some protection against
> > "starvation" i.e. dag runs recently considered should go to the back of
> the
> > line next time.
> >
> > Maybe you could clarify why / how that's not working / not optimal / how
> to
> > improve.
> >
> > If there are available slots in the pool and
> > > the max parallelism hasn't been reached yet, then the scheduler should
> > stop
> > > processing a dag that has already reached its max capacity of active
> > tasks.
> >
> >
> > If a dag run (or dag) is already at max capacity, it doesn't really
> matter
> > if there are slots available or parallelism isn't reached -- shouldn't it
> > stop anyway?
> >
> > In addition, the number of scheduled tasks picked for examining, should
> be
> > > capped at the number of max active tasks if that's lower than the query
> > > limit. If the active limit is 10 and we already have 5 running, then we
> > can
> > > queue at most 5 tasks. In that case, we shouldn't examine more than
> that.
> >
> >
> > Don't we already stop examining at that point?   I guess there's two
> things
> > you might be referring to.  One is, which TIs come out of the db and into
> > python, and the other is, what we do in python.  Just might be helpful to
> > be clear about the specific enhancements & changes you are making.
> >
> > There is already a patch with the changes mentioned above. IMO, these
> > > changes should be enabled/disabled with a config flag and not by
> default
> > > because not everyone has the same needs as us. In our testing, adding a
> > > limit on the tasks retrieved from the db requires more processing on
> the
> > > query which actually makes things worse when you have multiple small
> > dags.
> >
> >
> > I would like to see a stronger case made for configurability.  Why make
> it
> > configurable?  If the performance is always better, it should not be made
> > configurable.  Unless it's merely released as an opt-in experimental
> > feature.  If it is worse in some profiles, let's be clear about that.
> >
> > I did not read anything after `Here is a simple test case that makes the
> > benefits of the improvements
> > noticeable` because, it seemed rather long winded detail about a test
> > case.  A higher level summary might be helpful to your audience.  Is
> there
> > a PR with your optimization.  You wrote "there is a patch" but did not,
> > unless I miss something, share it.  I would take a look if you share it
> > though.
> >
> > Thanks
> >
> >
> >
> >
> >
> >
> >
> > On Sun, Aug 3, 2025 at 5:08 PM Daniel Standish <
> > daniel.stand...@astronomer.io> wrote:
> >
> > > Yes Ui is another part of this.
> > >
> > > At some point the grid and graph views completely stop making sense for
> > > that volume, and another type of view would be required both for
> > usability
> > > and performance
> > >
> > >
> > >
> > > On Sun, Aug 3, 2025 at 11:04 AM Jens Scheffler
> > <j_scheff...@gmx.de.invalid>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> We also have a current demand to have a workflow to execute 10k to
> 100k
> > >> tasks. Together with @AutomationDev85 we are working on a local
> solution
> > >> because we also saw problems in the Scheduler that are not linearly
> > >> scaling. And for sure not easy to be fixed. But from our investigation
> > >> also there are other problems to be considered like UI will also
> > >> potentially have problems.
> > >>
> > >> I am a bit sceptic that PR 49160 completely fixes the problems
> mentioned
> > >> here and made some comments. I do not want to stop enthusiasm to fix
> and
> > >> improve things but the Scheduler is quite complex and changed need to
> be
> > >> made with care.
> > >>
> > >> Actually I like the patch
> > >>
> > >>
> >
> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
> > >> as it just adds some limit preventing scheduler to focus on only one
> > >> run. But complexity is a bit big for a "patch" :-D
> > >>
> > >> I'd also propose atm the way that Jarek described and split-up the Dag
> > >> into multiple parts (divide and conquer) for the moment.
> > >>
> > >> Otherwise if there is a concrete demand on such large Dags... we maybe
> > >> need rather a broader initiative if we want to ensure 10k, 100k, 1M?
> > >> tasks are supported per Dag. Because depending on the magnitude we
> > >> strive for different approaches are needed.
> > >>
> > >> Jens
> > >>
> > >> On 03.08.25 16:33, Daniel Standish wrote:
> > >> > Definitely an area of the scheduler with some opportunity for
> > >> performance
> > >> > improvement.
> > >> >
> > >> > I would just mention that, you should also attempt to include some
> > >> > performance testing at load / scale because, window functions are
> > going
> > >> to
> > >> > be more expensive.
> > >> >
> > >> > What happens when you have many dags, many historical dag runs &
> TIs,
> > >> lots
> > >> > of stuff running concurrently.  You need to be mindful of the
> overall
> > >> > impact of such a change, and not look only at the time spent on
> > >> scheduling
> > >> > this particular dag.
> > >> >
> > >> > I did not look at the PRs yet, maybe you've covered this, but, it's
> > >> > important.
> > >> >
> > >> >
> > >> > On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias<
> christos...@gmail.com>
> > >> > wrote:
> > >> >
> > >> >> I'm going to review the PR code and test it more thoroughly before
> > >> leaving
> > >> >> a comment.
> > >> >>
> > >> >> This is my code for reference
> > >> >>
> > >> >>
> > >> >>
> > >>
> >
> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
> > >> >>
> > >> >> The current version is setting a limit per dag, across all
> dag_runs.
> > >> >>
> > >> >> Please correct me if I'm wrong, but the PR looks like it's changing
> > >> the way
> > >> >> that tasks are prioritized to avoid starvation. If that's the case,
> > >> I'm not
> > >> >> sure that this is the same issue. My proposal is that, if we have
> > >> reached
> > >> >> the max resources assigned to a dag, then stop processing its tasks
> > and
> > >> >> move on to the next one. I'm not changing how or which tasks are
> > >> picked.
> > >> >>
> > >> >> On Sun, Aug 3, 2025 at 3:23 PM asquator<asqua...@proton.me
> .invalid>
> > >> >> wrote:
> > >> >>
> > >> >>> Thank you for the feedback.
> > >> >>> Please, describe the case with failing limit checks in the PR
> (DAG's
> > >> >>> parameters and it's tasks' parameters and what fails to be
> checked)
> > >> and
> > >> >>> we'll try to fix it ASAP before you can test it again. Let's
> > continue
> > >> the
> > >> >>> PR-related discussion in the PR itself.
> > >> >>>
> > >> >>> On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias <
> > >> >>> christos...@gmail.com> wrote:
> > >> >>>
> > >> >>>> Thank you for bringing this PR to my attention.
> > >> >>>>
> > >> >>>> I haven't studied the code but I ran a quick test on the branch
> and
> > >> >> this
> > >> >>>> completely ignores the limit on scheduled tasks per dag or
> dag_run.
> > >> It
> > >> >>>> grabbed 70 tasks from the first dag and then moved all 70 to
> QUEUED
> > >> >>> without
> > >> >>>> any further checks.
> > >> >>>>
> > >> >>>> This is how I tested it
> > >> >>>>
> > >> >>
> > >>
> >
> https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1
> > >> >>>> On Sun, Aug 3, 2025 at 1:44 PM asquatorasqua...@proton.me
> .invalid
> > >> >>> wrote:
> > >> >>>>> Hello,
> > >> >>>>>
> > >> >>>>> This is a known issue stemming from the optimistic scheduling
> > >> >> strategy
> > >> >>>>> used in Airflow. We do address this in the above-mentioned PR. I
> > >> want
> > >> >>> to
> > >> >>>>> note that there are many cases where this problem may appear—it
> > was
> > >> >>>>> originally detected with pools, but we are striving to fix it in
> > all
> > >> >>> cases,
> > >> >>>>> such as the one described here with max_active_tis_per_dag, by
> > >> >>> switching to
> > >> >>>>> pessimistic scheduling with SQL window functions. While the
> > current
> > >> >>>>> strategy simply pulls the max_tis tasks and drops the ones that
> do
> > >> >> not
> > >> >>> meet
> > >> >>>>> the constraints, the new strategy will pull only the tasks that
> > are
> > >> >>>>> actually ready to be scheduled and that comply with all
> > concurrency
> > >> >>> limits.
> > >> >>>>> It would be very helpful for pushing this change to production
> if
> > >> you
> > >> >>>>> could assist us in alpha-testing it.
> > >> >>>>>
> > >> >>>>> See also:
> > >> >>>>> https://github.com/apache/airflow/discussions/49160
> > >> >>>>>
> > >> >>>>> Sent with Proton Mail secure email.
> > >> >>>>>
> > >> >>>>> On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif
> > >> >> elad...@apache.org
> > >> >>>>> wrote:
> > >> >>>>>
> > >> >>>>>> i think most of your issues will be addressed by
> > >> >>>>>> https://github.com/apache/airflow/pull/53492
> > >> >>>>>> The PR code can be tested with Breeze so you can set it up and
> > see
> > >> >>> if it
> > >> >>>>>> solves the problem this will also help with confirming it's the
> > >> >> right
> > >> >>>>>> fix.
> > >> >>>>>>
> > >> >>>>>> On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias
> > >> >>> christos...@gmail.com
> > >> >>>>>> wrote:
> > >> >>>>>>
> > >> >>>>>>> Hello,
> > >> >>>>>>>
> > >> >>>>>>> The scheduler is very efficient when running a large amount of
> > >> >> dags
> > >> >>>>>>> with up
> > >> >>>>>>> to 1000 tasks each. But in our case, we have dags with as many
> > as
> > >> >>>>>>> 10.000
> > >> >>>>>>> tasks. And in that scenario the scheduler and worker
> throughput
> > >> >>> drops
> > >> >>>>>>> significantly. Even if you have 1 such large dag with
> scheduled
> > >> >>> tasks,
> > >> >>>>>>> the
> > >> >>>>>>> performance hit becomes noticeable.
> > >> >>>>>>>
> > >> >>>>>>> We did some digging and we found that the issue comes from the
> > >> >>>>>>> scheduler's
> > >> >>>>>>> _executable_task_instances_to_queued
> > >> >>>>>>> <
> > >> >>>>>
> > >> >>
> > >>
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647
> > >> >>>>>>> method.
> > >> >>>>>>> In particular with the db query here
> > >> >>>>>>> <
> > >> >>>>>
> > >> >>
> > >>
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375
> > >> >>>>>>> and
> > >> >>>>>>> examining the results here
> > >> >>>>>>> <
> > >> >>>>>
> > >> >>
> > >>
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425
> > >> >>>>>>> .
> > >> >>>>>>>
> > >> >>>>>>> If you have a very large dag, and its tasks have been
> scheduled,
> > >> >>> then
> > >> >>>>>>> the
> > >> >>>>>>> scheduler will keep examining the tasks for queueing, even if
> it
> > >> >>> has
> > >> >>>>>>> reached the maximum number of active tasks for that particular
> > >> >> dag.
> > >> >>>>>>> Once
> > >> >>>>>>> that fails, then it will move on to examine the scheduled
> tasks
> > >> >> of
> > >> >>> the
> > >> >>>>>>> next
> > >> >>>>>>> dag or dag_run in line.
> > >> >>>>>>>
> > >> >>>>>>> This is inefficient and causes the throughput of the scheduler
> > >> >> and
> > >> >>> the
> > >> >>>>>>> workers to drop significantly. If there are available slots in
> > >> >> the
> > >> >>>>>>> pool and
> > >> >>>>>>> the max parallelism hasn't been reached yet, then the
> scheduler
> > >> >>> should
> > >> >>>>>>> stop
> > >> >>>>>>> processing a dag that has already reached its max capacity of
> > >> >>> active
> > >> >>>>>>> tasks.
> > >> >>>>>>>
> > >> >>>>>>> In addition, the number of scheduled tasks picked for
> examining,
> > >> >>>>>>> should be
> > >> >>>>>>> capped at the number of max active tasks if that's lower than
> > the
> > >> >>> query
> > >> >>>>>>> limit. If the active limit is 10 and we already have 5
> running,
> > >> >>> then
> > >> >>>>>>> we can
> > >> >>>>>>> queue at most 5 tasks. In that case, we shouldn't examine more
> > >> >> than
> > >> >>>>>>> that.
> > >> >>>>>>>
> > >> >>>>>>> There is already a patch with the changes mentioned above.
> IMO,
> > >> >>> these
> > >> >>>>>>> changes should be enabled/disabled with a config flag and not
> by
> > >> >>>>>>> default
> > >> >>>>>>> because not everyone has the same needs as us. In our testing,
> > >> >>> adding a
> > >> >>>>>>> limit on the tasks retrieved from the db requires more
> > processing
> > >> >>> on
> > >> >>>>>>> the
> > >> >>>>>>> query which actually makes things worse when you have multiple
> > >> >>> small
> > >> >>>>>>> dags.
> > >> >>>>>>>
> > >> >>>>>>> Here is a simple test case that makes the benefits of the
> > >> >>> improvements
> > >> >>>>>>> noticeable
> > >> >>>>>>>
> > >> >>>>>>> - we have 3 dags with thousands of tasks each
> > >> >>>>>>> - for simplicity let's have 1 dag_run per dag
> > >> >>>>>>> - triggering them takes some time and due to that, the FIFO
> > order
> > >> >>> of
> > >> >>>>>>> the
> > >> >>>>>>> tasks is very clear
> > >> >>>>>>> - e.g. 1000 tasks from dag1 were scheduled first and then 200
> > >> >> tasks
> > >> >>>>>>> from dag2 etc.
> > >> >>>>>>> - the executor has parallelism=100 and slots_available=100
> which
> > >> >>> means
> > >> >>>>>>> that it can run up to 100 tasks concurrently
> > >> >>>>>>> - max_active_tasks_per_dag is 4 which means that we can have
> up
> > >> >> to
> > >> >>> 4
> > >> >>>>>>> tasks running per dag.
> > >> >>>>>>> - For 3 dags, it means that we can run up to 12 tasks at the
> > same
> > >> >>>>>>> time (4 tasks from each dag)
> > >> >>>>>>> - max tis per query are set to 32, meaning that we can examine
> > up
> > >> >>> to 32
> > >> >>>>>>> scheduled tasks if there are available pool slots
> > >> >>>>>>>
> > >> >>>>>>> If we were to run the scheduler loop repeatedly until it
> queues
> > >> >> 12
> > >> >>>>>>> tasks
> > >> >>>>>>> and test the part that examines the scheduled tasks and queues
> > >> >>> them,
> > >> >>>>>>> then
> > >> >>>>>>>
> > >> >>>>>>> - with the query limit
> > >> >>>>>>> - 1 iteration, total time 0.05
> > >> >>>>>>> - During the iteration
> > >> >>>>>>> - we have parallelism 100, available slots 100 and query limit
> > 32
> > >> >>>>>>> which means that it will examine up to 32 scheduled tasks
> > >> >>>>>>> - it can queue up to 100 tasks
> > >> >>>>>>> - examines 12 tasks (instead of 32)
> > >> >>>>>>> - 4 tasks from dag1, reached max for the dag
> > >> >>>>>>> - 4 tasks from dag2, reached max for the dag
> > >> >>>>>>> - and 4 tasks from dag3, reached max for the dag
> > >> >>>>>>> - queues 4 from dag1, reaches max for the dag and moves on
> > >> >>>>>>> - queues 4 from dag2, reaches max for the dag and moves on
> > >> >>>>>>> - queues 4 from dag3, reaches max for the dag and moves on
> > >> >>>>>>> - stops queueing because we have reached the maximum per dag,
> > >> >>>>>>> although there are slots for more tasks
> > >> >>>>>>> - iteration finishes
> > >> >>>>>>> - without
> > >> >>>>>>> - 3 iterations, total time 0.29
> > >> >>>>>>> - During iteration 1
> > >> >>>>>>> - Examines 32 tasks, all from dag1 (due to FIFO)
> > >> >>>>>>> - queues 4 from dag1 and tries to queue the other 28 but fails
> > >> >>>>>>> - During iteration 2
> > >> >>>>>>> - examines the next 32 tasks from dag1
> > >> >>>>>>> - it can't queue any of them because it has reached the max
> for
> > >> >>>>>>> dag1, since the previous 4 are still running
> > >> >>>>>>> - examines 32 tasks from dag2
> > >> >>>>>>> - queues 4 from dag2 and tries to queue the other 28 but fails
> > >> >>>>>>> - During iteration 3
> > >> >>>>>>> - examines the next 32 tasks from dag1, same tasks that were
> > >> >>>>>>> examined in iteration 2
> > >> >>>>>>> - it can't queue any of them because it has reached the max
> for
> > >> >>>>>>> dag1 and the first 4 are still running
> > >> >>>>>>> - examines 32 tasks from dag2 , can't queue any of them
> because
> > >> >>>>>>> it has reached max for dag2 as well
> > >> >>>>>>> - examines 32 tasks from dag3
> > >> >>>>>>> - queues 4 from dag3 and tries to queue the other 28 but fails
> > >> >>>>>>>
> > >> >>>>>>> I used very low values for all the configs so that I can make
> > the
> > >> >>> point
> > >> >>>>>>> clear and easy to understand. If we increase them, then this
> > >> >> patch
> > >> >>> also
> > >> >>>>>>> makes the task selection more fair and the resource
> distribution
> > >> >>> more
> > >> >>>>>>> even.
> > >> >>>>>>>
> > >> >>>>>>> I would appreciate it if anyone familiar with the scheduler's
> > >> >> code
> > >> >>> can
> > >> >>>>>>> confirm this and also provide any feedback.
> > >> >>>>>>>
> > >> >>>>>>> Additionally, I have one question regarding the query limit.
> > >> >>> Should it
> > >> >>>>>>> be
> > >> >>>>>>> per dag_run or per dag? I've noticed that
> > >> >> max_active_tasks_per_dag
> > >> >>> has
> > >> >>>>>>> been changed to provide a value per dag_run but the docs
> haven't
> > >> >>> been
> > >> >>>>>>> updated.
> > >> >>>>>>>
> > >> >>>>>>> Thank you!
> > >> >>>>>>>
> > >> >>>>>>> Regards,
> > >> >>>>>>> Christos Bisias
> > >> >>>>>
> > >> ---------------------------------------------------------------------
> > >> >>>>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
> > >> >>>>> For additional commands, e-mail:dev-h...@airflow.apache.org
> > >> >>>
> > ---------------------------------------------------------------------
> > >> >>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
> > >> >>> For additional commands, e-mail:dev-h...@airflow.apache.org
> > >> >>>
> > >> >>>
> > >>
> > >
> >
>

Reply via email to