Definitely an area of the scheduler with some opportunity for performance
improvement.

I would just mention that, you should also attempt to include some
performance testing at load / scale because, window functions are going to
be more expensive.

What happens when you have many dags, many historical dag runs & TIs, lots
of stuff running concurrently.  You need to be mindful of the overall
impact of such a change, and not look only at the time spent on scheduling
this particular dag.

I did not look at the PRs yet, maybe you've covered this, but, it's
important.


On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias <christos...@gmail.com>
wrote:

> I'm going to review the PR code and test it more thoroughly before leaving
> a comment.
>
> This is my code for reference
>
>
> https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch
>
> The current version is setting a limit per dag, across all dag_runs.
>
> Please correct me if I'm wrong, but the PR looks like it's changing the way
> that tasks are prioritized to avoid starvation. If that's the case, I'm not
> sure that this is the same issue. My proposal is that, if we have reached
> the max resources assigned to a dag, then stop processing its tasks and
> move on to the next one. I'm not changing how or which tasks are picked.
>
> On Sun, Aug 3, 2025 at 3:23 PM asquator <asqua...@proton.me.invalid>
> wrote:
>
> > Thank you for the feedback.
> > Please, describe the case with failing limit checks in the PR (DAG's
> > parameters and it's tasks' parameters and what fails to be checked) and
> > we'll try to fix it ASAP before you can test it again. Let's continue the
> > PR-related discussion in the PR itself.
> >
> > On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias <
> > christos...@gmail.com> wrote:
> >
> > > Thank you for bringing this PR to my attention.
> > >
> > > I haven't studied the code but I ran a quick test on the branch and
> this
> > > completely ignores the limit on scheduled tasks per dag or dag_run. It
> > > grabbed 70 tasks from the first dag and then moved all 70 to QUEUED
> > without
> > > any further checks.
> > >
> > > This is how I tested it
> > >
> >
> https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1
> > >
> > > On Sun, Aug 3, 2025 at 1:44 PM asquator asqua...@proton.me.invalid
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > This is a known issue stemming from the optimistic scheduling
> strategy
> > > > used in Airflow. We do address this in the above-mentioned PR. I want
> > to
> > > > note that there are many cases where this problem may appear—it was
> > > > originally detected with pools, but we are striving to fix it in all
> > cases,
> > > > such as the one described here with max_active_tis_per_dag, by
> > switching to
> > > > pessimistic scheduling with SQL window functions. While the current
> > > > strategy simply pulls the max_tis tasks and drops the ones that do
> not
> > meet
> > > > the constraints, the new strategy will pull only the tasks that are
> > > > actually ready to be scheduled and that comply with all concurrency
> > limits.
> > > >
> > > > It would be very helpful for pushing this change to production if you
> > > > could assist us in alpha-testing it.
> > > >
> > > > See also:
> > > > https://github.com/apache/airflow/discussions/49160
> > > >
> > > > Sent with Proton Mail secure email.
> > > >
> > > > On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif
> elad...@apache.org
> > > > wrote:
> > > >
> > > > > i think most of your issues will be addressed by
> > > > > https://github.com/apache/airflow/pull/53492
> > > > > The PR code can be tested with Breeze so you can set it up and see
> > if it
> > > > > solves the problem this will also help with confirming it's the
> right
> > > > > fix.
> > > > >
> > > > > On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias
> > christos...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > The scheduler is very efficient when running a large amount of
> dags
> > > > > > with up
> > > > > > to 1000 tasks each. But in our case, we have dags with as many as
> > > > > > 10.000
> > > > > > tasks. And in that scenario the scheduler and worker throughput
> > drops
> > > > > > significantly. Even if you have 1 such large dag with scheduled
> > tasks,
> > > > > > the
> > > > > > performance hit becomes noticeable.
> > > > > >
> > > > > > We did some digging and we found that the issue comes from the
> > > > > > scheduler's
> > > > > > _executable_task_instances_to_queued
> > > > > > <
> > > >
> > > >
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647
> > > >
> > > > > > method.
> > > > > > In particular with the db query here
> > > > > > <
> > > >
> > > >
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375
> > > >
> > > > > > and
> > > > > > examining the results here
> > > > > > <
> > > >
> > > >
> >
> https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425
> > > >
> > > > > > .
> > > > > >
> > > > > > If you have a very large dag, and its tasks have been scheduled,
> > then
> > > > > > the
> > > > > > scheduler will keep examining the tasks for queueing, even if it
> > has
> > > > > > reached the maximum number of active tasks for that particular
> dag.
> > > > > > Once
> > > > > > that fails, then it will move on to examine the scheduled tasks
> of
> > the
> > > > > > next
> > > > > > dag or dag_run in line.
> > > > > >
> > > > > > This is inefficient and causes the throughput of the scheduler
> and
> > the
> > > > > > workers to drop significantly. If there are available slots in
> the
> > > > > > pool and
> > > > > > the max parallelism hasn't been reached yet, then the scheduler
> > should
> > > > > > stop
> > > > > > processing a dag that has already reached its max capacity of
> > active
> > > > > > tasks.
> > > > > >
> > > > > > In addition, the number of scheduled tasks picked for examining,
> > > > > > should be
> > > > > > capped at the number of max active tasks if that's lower than the
> > query
> > > > > > limit. If the active limit is 10 and we already have 5 running,
> > then
> > > > > > we can
> > > > > > queue at most 5 tasks. In that case, we shouldn't examine more
> than
> > > > > > that.
> > > > > >
> > > > > > There is already a patch with the changes mentioned above. IMO,
> > these
> > > > > > changes should be enabled/disabled with a config flag and not by
> > > > > > default
> > > > > > because not everyone has the same needs as us. In our testing,
> > adding a
> > > > > > limit on the tasks retrieved from the db requires more processing
> > on
> > > > > > the
> > > > > > query which actually makes things worse when you have multiple
> > small
> > > > > > dags.
> > > > > >
> > > > > > Here is a simple test case that makes the benefits of the
> > improvements
> > > > > > noticeable
> > > > > >
> > > > > > - we have 3 dags with thousands of tasks each
> > > > > > - for simplicity let's have 1 dag_run per dag
> > > > > > - triggering them takes some time and due to that, the FIFO order
> > of
> > > > > > the
> > > > > > tasks is very clear
> > > > > > - e.g. 1000 tasks from dag1 were scheduled first and then 200
> tasks
> > > > > > from dag2 etc.
> > > > > > - the executor has parallelism=100 and slots_available=100 which
> > means
> > > > > > that it can run up to 100 tasks concurrently
> > > > > > - max_active_tasks_per_dag is 4 which means that we can have up
> to
> > 4
> > > > > > tasks running per dag.
> > > > > > - For 3 dags, it means that we can run up to 12 tasks at the same
> > > > > > time (4 tasks from each dag)
> > > > > > - max tis per query are set to 32, meaning that we can examine up
> > to 32
> > > > > > scheduled tasks if there are available pool slots
> > > > > >
> > > > > > If we were to run the scheduler loop repeatedly until it queues
> 12
> > > > > > tasks
> > > > > > and test the part that examines the scheduled tasks and queues
> > them,
> > > > > > then
> > > > > >
> > > > > > - with the query limit
> > > > > > - 1 iteration, total time 0.05
> > > > > > - During the iteration
> > > > > > - we have parallelism 100, available slots 100 and query limit 32
> > > > > > which means that it will examine up to 32 scheduled tasks
> > > > > > - it can queue up to 100 tasks
> > > > > > - examines 12 tasks (instead of 32)
> > > > > > - 4 tasks from dag1, reached max for the dag
> > > > > > - 4 tasks from dag2, reached max for the dag
> > > > > > - and 4 tasks from dag3, reached max for the dag
> > > > > > - queues 4 from dag1, reaches max for the dag and moves on
> > > > > > - queues 4 from dag2, reaches max for the dag and moves on
> > > > > > - queues 4 from dag3, reaches max for the dag and moves on
> > > > > > - stops queueing because we have reached the maximum per dag,
> > > > > > although there are slots for more tasks
> > > > > > - iteration finishes
> > > > > > - without
> > > > > > - 3 iterations, total time 0.29
> > > > > > - During iteration 1
> > > > > > - Examines 32 tasks, all from dag1 (due to FIFO)
> > > > > > - queues 4 from dag1 and tries to queue the other 28 but fails
> > > > > > - During iteration 2
> > > > > > - examines the next 32 tasks from dag1
> > > > > > - it can't queue any of them because it has reached the max for
> > > > > > dag1, since the previous 4 are still running
> > > > > > - examines 32 tasks from dag2
> > > > > > - queues 4 from dag2 and tries to queue the other 28 but fails
> > > > > > - During iteration 3
> > > > > > - examines the next 32 tasks from dag1, same tasks that were
> > > > > > examined in iteration 2
> > > > > > - it can't queue any of them because it has reached the max for
> > > > > > dag1 and the first 4 are still running
> > > > > > - examines 32 tasks from dag2 , can't queue any of them because
> > > > > > it has reached max for dag2 as well
> > > > > > - examines 32 tasks from dag3
> > > > > > - queues 4 from dag3 and tries to queue the other 28 but fails
> > > > > >
> > > > > > I used very low values for all the configs so that I can make the
> > point
> > > > > > clear and easy to understand. If we increase them, then this
> patch
> > also
> > > > > > makes the task selection more fair and the resource distribution
> > more
> > > > > > even.
> > > > > >
> > > > > > I would appreciate it if anyone familiar with the scheduler's
> code
> > can
> > > > > > confirm this and also provide any feedback.
> > > > > >
> > > > > > Additionally, I have one question regarding the query limit.
> > Should it
> > > > > > be
> > > > > > per dag_run or per dag? I've noticed that
> max_active_tasks_per_dag
> > has
> > > > > > been changed to provide a value per dag_run but the docs haven't
> > been
> > > > > > updated.
> > > > > >
> > > > > > Thank you!
> > > > > >
> > > > > > Regards,
> > > > > > Christos Bisias
> > > >
> > > > ---------------------------------------------------------------------
> > > > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> > > > For additional commands, e-mail: dev-h...@airflow.apache.org
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> > For additional commands, e-mail: dev-h...@airflow.apache.org
> >
> >
>

Reply via email to