I believe our earlier approach for such cases was ..... Why don't you split your Dags. Generally speaking when you write a system like "airflow" you have to optimise for something. You cannot optimize for everything. I think simply Airflow is not optimized to handle Dags that have 10.000 tasks. This was not an "optimization goal". We had other goals - optimizing for delays between tasks for example - when it comes to scheduling. And likely a number of others for different parts of the system.Including NOT optimizing our UI to display Dags that have 10.000 tasks.
And I think - we don't have to. I would like to step back and see why you have a Dag that has 10.000 tasks rather than 10 Dags with 1000 tasks each ? Especially that asset-aware scheduling in Airflow is designed specifically to allow you to modularize and split the dags and have them depend on each other. I can't believe you cannot logically split your Dag into smaller ones and add dependencies between Dags. This is very, very similar to refactoring in programming - surely, it's possible to write your application in a single 100.000 lines of code file, but is it a good idea? We all probably already know intuitively, that while it is natural to start with "single script", when it grows, splitting it up to a modularized, separate files (modules) or even folders (packages) and implement the ways they interact (APIs) is a bit more demanding conceptually and requires a bit of architecting and higher level thinking, including some discipline in the ways you write the modules - but then eventually it is way, way, way better in multiple aspects - maintainability, scalability, distributability etc. etc. Similarly splitting Dags into smaller ones and adding assets as an "API" between those is something that we've been promoting for a while - and our UI has good (And in 3 even better) support for that. So my question to you is: is it impossible, or just demanding or difficult to split your Dags into smaller dags connected with asset aware scheduling? if not possible - can you explain why - so that maybe we can figure out what else we need to add to Airflow to make it possible? And a general comment - As a long term maintainer I've learned that sometimes we - maintianers as a group - have to sometimes decide NOT to implement something our users want - because it will be costly to maintain or difficult or risky. And instead - we have another tool in our toolbox - friction, and gentle guiding our users to avoid the friction. And sometimes this "friction" should be deliberate and we should NOT remove it because that friction makes users make choices that follow the directions we set. In this particular case - NOT supporting 10.000 tasks Dags might be seen as deliberate friction which we might simply not want to fix - because it might guide our users to make better (from our point of view) choices - and - for example - split their Dags. So ... what do you say Christos - is our friction enough for you and your company to think about splitting the Dags? J. On Sun, Aug 3, 2025 at 2:57 PM Christos Bisias <christos...@gmail.com> wrote: > I'm going to review the PR code and test it more thoroughly before leaving > a comment. > > This is my code for reference > > > https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch > > The current version is setting a limit per dag, across all dag_runs. > > Please correct me if I'm wrong, but the PR looks like it's changing the way > that tasks are prioritized to avoid starvation. If that's the case, I'm not > sure that this is the same issue. My proposal is that, if we have reached > the max resources assigned to a dag, then stop processing its tasks and > move on to the next one. I'm not changing how or which tasks are picked. > > On Sun, Aug 3, 2025 at 3:23 PM asquator <asqua...@proton.me.invalid> > wrote: > > > Thank you for the feedback. > > Please, describe the case with failing limit checks in the PR (DAG's > > parameters and it's tasks' parameters and what fails to be checked) and > > we'll try to fix it ASAP before you can test it again. Let's continue the > > PR-related discussion in the PR itself. > > > > On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias < > > christos...@gmail.com> wrote: > > > > > Thank you for bringing this PR to my attention. > > > > > > I haven't studied the code but I ran a quick test on the branch and > this > > > completely ignores the limit on scheduled tasks per dag or dag_run. It > > > grabbed 70 tasks from the first dag and then moved all 70 to QUEUED > > without > > > any further checks. > > > > > > This is how I tested it > > > > > > https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1 > > > > > > On Sun, Aug 3, 2025 at 1:44 PM asquator asqua...@proton.me.invalid > > wrote: > > > > > > > Hello, > > > > > > > > This is a known issue stemming from the optimistic scheduling > strategy > > > > used in Airflow. We do address this in the above-mentioned PR. I want > > to > > > > note that there are many cases where this problem may appear—it was > > > > originally detected with pools, but we are striving to fix it in all > > cases, > > > > such as the one described here with max_active_tis_per_dag, by > > switching to > > > > pessimistic scheduling with SQL window functions. While the current > > > > strategy simply pulls the max_tis tasks and drops the ones that do > not > > meet > > > > the constraints, the new strategy will pull only the tasks that are > > > > actually ready to be scheduled and that comply with all concurrency > > limits. > > > > > > > > It would be very helpful for pushing this change to production if you > > > > could assist us in alpha-testing it. > > > > > > > > See also: > > > > https://github.com/apache/airflow/discussions/49160 > > > > > > > > Sent with Proton Mail secure email. > > > > > > > > On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif > elad...@apache.org > > > > wrote: > > > > > > > > > i think most of your issues will be addressed by > > > > > https://github.com/apache/airflow/pull/53492 > > > > > The PR code can be tested with Breeze so you can set it up and see > > if it > > > > > solves the problem this will also help with confirming it's the > right > > > > > fix. > > > > > > > > > > On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias > > christos...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > Hello, > > > > > > > > > > > > The scheduler is very efficient when running a large amount of > dags > > > > > > with up > > > > > > to 1000 tasks each. But in our case, we have dags with as many as > > > > > > 10.000 > > > > > > tasks. And in that scenario the scheduler and worker throughput > > drops > > > > > > significantly. Even if you have 1 such large dag with scheduled > > tasks, > > > > > > the > > > > > > performance hit becomes noticeable. > > > > > > > > > > > > We did some digging and we found that the issue comes from the > > > > > > scheduler's > > > > > > _executable_task_instances_to_queued > > > > > > < > > > > > > > > > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647 > > > > > > > > > > method. > > > > > > In particular with the db query here > > > > > > < > > > > > > > > > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375 > > > > > > > > > > and > > > > > > examining the results here > > > > > > < > > > > > > > > > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425 > > > > > > > > > > . > > > > > > > > > > > > If you have a very large dag, and its tasks have been scheduled, > > then > > > > > > the > > > > > > scheduler will keep examining the tasks for queueing, even if it > > has > > > > > > reached the maximum number of active tasks for that particular > dag. > > > > > > Once > > > > > > that fails, then it will move on to examine the scheduled tasks > of > > the > > > > > > next > > > > > > dag or dag_run in line. > > > > > > > > > > > > This is inefficient and causes the throughput of the scheduler > and > > the > > > > > > workers to drop significantly. If there are available slots in > the > > > > > > pool and > > > > > > the max parallelism hasn't been reached yet, then the scheduler > > should > > > > > > stop > > > > > > processing a dag that has already reached its max capacity of > > active > > > > > > tasks. > > > > > > > > > > > > In addition, the number of scheduled tasks picked for examining, > > > > > > should be > > > > > > capped at the number of max active tasks if that's lower than the > > query > > > > > > limit. If the active limit is 10 and we already have 5 running, > > then > > > > > > we can > > > > > > queue at most 5 tasks. In that case, we shouldn't examine more > than > > > > > > that. > > > > > > > > > > > > There is already a patch with the changes mentioned above. IMO, > > these > > > > > > changes should be enabled/disabled with a config flag and not by > > > > > > default > > > > > > because not everyone has the same needs as us. In our testing, > > adding a > > > > > > limit on the tasks retrieved from the db requires more processing > > on > > > > > > the > > > > > > query which actually makes things worse when you have multiple > > small > > > > > > dags. > > > > > > > > > > > > Here is a simple test case that makes the benefits of the > > improvements > > > > > > noticeable > > > > > > > > > > > > - we have 3 dags with thousands of tasks each > > > > > > - for simplicity let's have 1 dag_run per dag > > > > > > - triggering them takes some time and due to that, the FIFO order > > of > > > > > > the > > > > > > tasks is very clear > > > > > > - e.g. 1000 tasks from dag1 were scheduled first and then 200 > tasks > > > > > > from dag2 etc. > > > > > > - the executor has parallelism=100 and slots_available=100 which > > means > > > > > > that it can run up to 100 tasks concurrently > > > > > > - max_active_tasks_per_dag is 4 which means that we can have up > to > > 4 > > > > > > tasks running per dag. > > > > > > - For 3 dags, it means that we can run up to 12 tasks at the same > > > > > > time (4 tasks from each dag) > > > > > > - max tis per query are set to 32, meaning that we can examine up > > to 32 > > > > > > scheduled tasks if there are available pool slots > > > > > > > > > > > > If we were to run the scheduler loop repeatedly until it queues > 12 > > > > > > tasks > > > > > > and test the part that examines the scheduled tasks and queues > > them, > > > > > > then > > > > > > > > > > > > - with the query limit > > > > > > - 1 iteration, total time 0.05 > > > > > > - During the iteration > > > > > > - we have parallelism 100, available slots 100 and query limit 32 > > > > > > which means that it will examine up to 32 scheduled tasks > > > > > > - it can queue up to 100 tasks > > > > > > - examines 12 tasks (instead of 32) > > > > > > - 4 tasks from dag1, reached max for the dag > > > > > > - 4 tasks from dag2, reached max for the dag > > > > > > - and 4 tasks from dag3, reached max for the dag > > > > > > - queues 4 from dag1, reaches max for the dag and moves on > > > > > > - queues 4 from dag2, reaches max for the dag and moves on > > > > > > - queues 4 from dag3, reaches max for the dag and moves on > > > > > > - stops queueing because we have reached the maximum per dag, > > > > > > although there are slots for more tasks > > > > > > - iteration finishes > > > > > > - without > > > > > > - 3 iterations, total time 0.29 > > > > > > - During iteration 1 > > > > > > - Examines 32 tasks, all from dag1 (due to FIFO) > > > > > > - queues 4 from dag1 and tries to queue the other 28 but fails > > > > > > - During iteration 2 > > > > > > - examines the next 32 tasks from dag1 > > > > > > - it can't queue any of them because it has reached the max for > > > > > > dag1, since the previous 4 are still running > > > > > > - examines 32 tasks from dag2 > > > > > > - queues 4 from dag2 and tries to queue the other 28 but fails > > > > > > - During iteration 3 > > > > > > - examines the next 32 tasks from dag1, same tasks that were > > > > > > examined in iteration 2 > > > > > > - it can't queue any of them because it has reached the max for > > > > > > dag1 and the first 4 are still running > > > > > > - examines 32 tasks from dag2 , can't queue any of them because > > > > > > it has reached max for dag2 as well > > > > > > - examines 32 tasks from dag3 > > > > > > - queues 4 from dag3 and tries to queue the other 28 but fails > > > > > > > > > > > > I used very low values for all the configs so that I can make the > > point > > > > > > clear and easy to understand. If we increase them, then this > patch > > also > > > > > > makes the task selection more fair and the resource distribution > > more > > > > > > even. > > > > > > > > > > > > I would appreciate it if anyone familiar with the scheduler's > code > > can > > > > > > confirm this and also provide any feedback. > > > > > > > > > > > > Additionally, I have one question regarding the query limit. > > Should it > > > > > > be > > > > > > per dag_run or per dag? I've noticed that > max_active_tasks_per_dag > > has > > > > > > been changed to provide a value per dag_run but the docs haven't > > been > > > > > > updated. > > > > > > > > > > > > Thank you! > > > > > > > > > > > > Regards, > > > > > > Christos Bisias > > > > > > > > --------------------------------------------------------------------- > > > > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org > > > > For additional commands, e-mail: dev-h...@airflow.apache.org > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org > > For additional commands, e-mail: dev-h...@airflow.apache.org > > > > >