Hi,

We also have a current demand to have a workflow to execute 10k to 100k tasks. Together with @AutomationDev85 we are working on a local solution because we also saw problems in the Scheduler that are not linearly scaling. And for sure not easy to be fixed. But from our investigation also there are other problems to be considered like UI will also potentially have problems.

I am a bit sceptic that PR 49160 completely fixes the problems mentioned here and made some comments. I do not want to stop enthusiasm to fix and improve things but the Scheduler is quite complex and changed need to be made with care.

Actually I like the patch https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch as it just adds some limit preventing scheduler to focus on only one run. But complexity is a bit big for a "patch" :-D

I'd also propose atm the way that Jarek described and split-up the Dag into multiple parts (divide and conquer) for the moment.

Otherwise if there is a concrete demand on such large Dags... we maybe need rather a broader initiative if we want to ensure 10k, 100k, 1M? tasks are supported per Dag. Because depending on the magnitude we strive for different approaches are needed.

Jens

On 03.08.25 16:33, Daniel Standish wrote:
Definitely an area of the scheduler with some opportunity for performance
improvement.

I would just mention that, you should also attempt to include some
performance testing at load / scale because, window functions are going to
be more expensive.

What happens when you have many dags, many historical dag runs & TIs, lots
of stuff running concurrently.  You need to be mindful of the overall
impact of such a change, and not look only at the time spent on scheduling
this particular dag.

I did not look at the PRs yet, maybe you've covered this, but, it's
important.


On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias<christos...@gmail.com>
wrote:

I'm going to review the PR code and test it more thoroughly before leaving
a comment.

This is my code for reference


https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch

The current version is setting a limit per dag, across all dag_runs.

Please correct me if I'm wrong, but the PR looks like it's changing the way
that tasks are prioritized to avoid starvation. If that's the case, I'm not
sure that this is the same issue. My proposal is that, if we have reached
the max resources assigned to a dag, then stop processing its tasks and
move on to the next one. I'm not changing how or which tasks are picked.

On Sun, Aug 3, 2025 at 3:23 PM asquator<asqua...@proton.me.invalid>
wrote:

Thank you for the feedback.
Please, describe the case with failing limit checks in the PR (DAG's
parameters and it's tasks' parameters and what fails to be checked) and
we'll try to fix it ASAP before you can test it again. Let's continue the
PR-related discussion in the PR itself.

On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias <
christos...@gmail.com> wrote:

Thank you for bringing this PR to my attention.

I haven't studied the code but I ran a quick test on the branch and
this
completely ignores the limit on scheduled tasks per dag or dag_run. It
grabbed 70 tasks from the first dag and then moved all 70 to QUEUED
without
any further checks.

This is how I tested it

https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1
On Sun, Aug 3, 2025 at 1:44 PM asquatorasqua...@proton.me.invalid
wrote:
Hello,

This is a known issue stemming from the optimistic scheduling
strategy
used in Airflow. We do address this in the above-mentioned PR. I want
to
note that there are many cases where this problem may appear—it was
originally detected with pools, but we are striving to fix it in all
cases,
such as the one described here with max_active_tis_per_dag, by
switching to
pessimistic scheduling with SQL window functions. While the current
strategy simply pulls the max_tis tasks and drops the ones that do
not
meet
the constraints, the new strategy will pull only the tasks that are
actually ready to be scheduled and that comply with all concurrency
limits.
It would be very helpful for pushing this change to production if you
could assist us in alpha-testing it.

See also:
https://github.com/apache/airflow/discussions/49160

Sent with Proton Mail secure email.

On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif
elad...@apache.org
wrote:

i think most of your issues will be addressed by
https://github.com/apache/airflow/pull/53492
The PR code can be tested with Breeze so you can set it up and see
if it
solves the problem this will also help with confirming it's the
right
fix.

On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias
christos...@gmail.com
wrote:

Hello,

The scheduler is very efficient when running a large amount of
dags
with up
to 1000 tasks each. But in our case, we have dags with as many as
10.000
tasks. And in that scenario the scheduler and worker throughput
drops
significantly. Even if you have 1 such large dag with scheduled
tasks,
the
performance hit becomes noticeable.

We did some digging and we found that the issue comes from the
scheduler's
_executable_task_instances_to_queued
<

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647
method.
In particular with the db query here
<

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375
and
examining the results here
<

https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425
.

If you have a very large dag, and its tasks have been scheduled,
then
the
scheduler will keep examining the tasks for queueing, even if it
has
reached the maximum number of active tasks for that particular
dag.
Once
that fails, then it will move on to examine the scheduled tasks
of
the
next
dag or dag_run in line.

This is inefficient and causes the throughput of the scheduler
and
the
workers to drop significantly. If there are available slots in
the
pool and
the max parallelism hasn't been reached yet, then the scheduler
should
stop
processing a dag that has already reached its max capacity of
active
tasks.

In addition, the number of scheduled tasks picked for examining,
should be
capped at the number of max active tasks if that's lower than the
query
limit. If the active limit is 10 and we already have 5 running,
then
we can
queue at most 5 tasks. In that case, we shouldn't examine more
than
that.

There is already a patch with the changes mentioned above. IMO,
these
changes should be enabled/disabled with a config flag and not by
default
because not everyone has the same needs as us. In our testing,
adding a
limit on the tasks retrieved from the db requires more processing
on
the
query which actually makes things worse when you have multiple
small
dags.

Here is a simple test case that makes the benefits of the
improvements
noticeable

- we have 3 dags with thousands of tasks each
- for simplicity let's have 1 dag_run per dag
- triggering them takes some time and due to that, the FIFO order
of
the
tasks is very clear
- e.g. 1000 tasks from dag1 were scheduled first and then 200
tasks
from dag2 etc.
- the executor has parallelism=100 and slots_available=100 which
means
that it can run up to 100 tasks concurrently
- max_active_tasks_per_dag is 4 which means that we can have up
to
4
tasks running per dag.
- For 3 dags, it means that we can run up to 12 tasks at the same
time (4 tasks from each dag)
- max tis per query are set to 32, meaning that we can examine up
to 32
scheduled tasks if there are available pool slots

If we were to run the scheduler loop repeatedly until it queues
12
tasks
and test the part that examines the scheduled tasks and queues
them,
then

- with the query limit
- 1 iteration, total time 0.05
- During the iteration
- we have parallelism 100, available slots 100 and query limit 32
which means that it will examine up to 32 scheduled tasks
- it can queue up to 100 tasks
- examines 12 tasks (instead of 32)
- 4 tasks from dag1, reached max for the dag
- 4 tasks from dag2, reached max for the dag
- and 4 tasks from dag3, reached max for the dag
- queues 4 from dag1, reaches max for the dag and moves on
- queues 4 from dag2, reaches max for the dag and moves on
- queues 4 from dag3, reaches max for the dag and moves on
- stops queueing because we have reached the maximum per dag,
although there are slots for more tasks
- iteration finishes
- without
- 3 iterations, total time 0.29
- During iteration 1
- Examines 32 tasks, all from dag1 (due to FIFO)
- queues 4 from dag1 and tries to queue the other 28 but fails
- During iteration 2
- examines the next 32 tasks from dag1
- it can't queue any of them because it has reached the max for
dag1, since the previous 4 are still running
- examines 32 tasks from dag2
- queues 4 from dag2 and tries to queue the other 28 but fails
- During iteration 3
- examines the next 32 tasks from dag1, same tasks that were
examined in iteration 2
- it can't queue any of them because it has reached the max for
dag1 and the first 4 are still running
- examines 32 tasks from dag2 , can't queue any of them because
it has reached max for dag2 as well
- examines 32 tasks from dag3
- queues 4 from dag3 and tries to queue the other 28 but fails

I used very low values for all the configs so that I can make the
point
clear and easy to understand. If we increase them, then this
patch
also
makes the task selection more fair and the resource distribution
more
even.

I would appreciate it if anyone familiar with the scheduler's
code
can
confirm this and also provide any feedback.

Additionally, I have one question regarding the query limit.
Should it
be
per dag_run or per dag? I've noticed that
max_active_tasks_per_dag
has
been changed to provide a value per dag_run but the docs haven't
been
updated.

Thank you!

Regards,
Christos Bisias
---------------------------------------------------------------------
To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
For additional commands, e-mail:dev-h...@airflow.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org
For additional commands, e-mail:dev-h...@airflow.apache.org


Reply via email to