Definitely an area of the scheduler with some opportunity for performance improvement.
I would just mention that, you should also attempt to include some performance testing at load / scale because, window functions are going to be more expensive. What happens when you have many dags, many historical dag runs & TIs, lots of stuff running concurrently. You need to be mindful of the overall impact of such a change, and not look only at the time spent on scheduling this particular dag. I did not look at the PRs yet, maybe you've covered this, but, it's important. On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias <christos...@gmail.com> wrote: > I'm going to review the PR code and test it more thoroughly before leaving > a comment. > > This is my code for reference > > > https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch > > The current version is setting a limit per dag, across all dag_runs. > > Please correct me if I'm wrong, but the PR looks like it's changing the way > that tasks are prioritized to avoid starvation. If that's the case, I'm not > sure that this is the same issue. My proposal is that, if we have reached > the max resources assigned to a dag, then stop processing its tasks and > move on to the next one. I'm not changing how or which tasks are picked. > > On Sun, Aug 3, 2025 at 3:23 PM asquator <asqua...@proton.me.invalid> > wrote: > > > Thank you for the feedback. > > Please, describe the case with failing limit checks in the PR (DAG's > > parameters and it's tasks' parameters and what fails to be checked) and > > we'll try to fix it ASAP before you can test it again. Let's continue the > > PR-related discussion in the PR itself. > > > > On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias < > > christos...@gmail.com> wrote: > > > > > Thank you for bringing this PR to my attention. > > > > > > I haven't studied the code but I ran a quick test on the branch and > this > > > completely ignores the limit on scheduled tasks per dag or dag_run. It > > > grabbed 70 tasks from the first dag and then moved all 70 to QUEUED > > without > > > any further checks. > > > > > > This is how I tested it > > > > > > https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1 > > > > > > On Sun, Aug 3, 2025 at 1:44 PM asquator asqua...@proton.me.invalid > > wrote: > > > > > > > Hello, > > > > > > > > This is a known issue stemming from the optimistic scheduling > strategy > > > > used in Airflow. We do address this in the above-mentioned PR. I want > > to > > > > note that there are many cases where this problem may appear—it was > > > > originally detected with pools, but we are striving to fix it in all > > cases, > > > > such as the one described here with max_active_tis_per_dag, by > > switching to > > > > pessimistic scheduling with SQL window functions. While the current > > > > strategy simply pulls the max_tis tasks and drops the ones that do > not > > meet > > > > the constraints, the new strategy will pull only the tasks that are > > > > actually ready to be scheduled and that comply with all concurrency > > limits. > > > > > > > > It would be very helpful for pushing this change to production if you > > > > could assist us in alpha-testing it. > > > > > > > > See also: > > > > https://github.com/apache/airflow/discussions/49160 > > > > > > > > Sent with Proton Mail secure email. > > > > > > > > On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif > elad...@apache.org > > > > wrote: > > > > > > > > > i think most of your issues will be addressed by > > > > > https://github.com/apache/airflow/pull/53492 > > > > > The PR code can be tested with Breeze so you can set it up and see > > if it > > > > > solves the problem this will also help with confirming it's the > right > > > > > fix. > > > > > > > > > > On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias > > christos...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > Hello, > > > > > > > > > > > > The scheduler is very efficient when running a large amount of > dags > > > > > > with up > > > > > > to 1000 tasks each. But in our case, we have dags with as many as > > > > > > 10.000 > > > > > > tasks. And in that scenario the scheduler and worker throughput > > drops > > > > > > significantly. Even if you have 1 such large dag with scheduled > > tasks, > > > > > > the > > > > > > performance hit becomes noticeable. > > > > > > > > > > > > We did some digging and we found that the issue comes from the > > > > > > scheduler's > > > > > > _executable_task_instances_to_queued > > > > > > < > > > > > > > > > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647 > > > > > > > > > > method. > > > > > > In particular with the db query here > > > > > > < > > > > > > > > > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375 > > > > > > > > > > and > > > > > > examining the results here > > > > > > < > > > > > > > > > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425 > > > > > > > > > > . > > > > > > > > > > > > If you have a very large dag, and its tasks have been scheduled, > > then > > > > > > the > > > > > > scheduler will keep examining the tasks for queueing, even if it > > has > > > > > > reached the maximum number of active tasks for that particular > dag. > > > > > > Once > > > > > > that fails, then it will move on to examine the scheduled tasks > of > > the > > > > > > next > > > > > > dag or dag_run in line. > > > > > > > > > > > > This is inefficient and causes the throughput of the scheduler > and > > the > > > > > > workers to drop significantly. If there are available slots in > the > > > > > > pool and > > > > > > the max parallelism hasn't been reached yet, then the scheduler > > should > > > > > > stop > > > > > > processing a dag that has already reached its max capacity of > > active > > > > > > tasks. > > > > > > > > > > > > In addition, the number of scheduled tasks picked for examining, > > > > > > should be > > > > > > capped at the number of max active tasks if that's lower than the > > query > > > > > > limit. If the active limit is 10 and we already have 5 running, > > then > > > > > > we can > > > > > > queue at most 5 tasks. In that case, we shouldn't examine more > than > > > > > > that. > > > > > > > > > > > > There is already a patch with the changes mentioned above. IMO, > > these > > > > > > changes should be enabled/disabled with a config flag and not by > > > > > > default > > > > > > because not everyone has the same needs as us. In our testing, > > adding a > > > > > > limit on the tasks retrieved from the db requires more processing > > on > > > > > > the > > > > > > query which actually makes things worse when you have multiple > > small > > > > > > dags. > > > > > > > > > > > > Here is a simple test case that makes the benefits of the > > improvements > > > > > > noticeable > > > > > > > > > > > > - we have 3 dags with thousands of tasks each > > > > > > - for simplicity let's have 1 dag_run per dag > > > > > > - triggering them takes some time and due to that, the FIFO order > > of > > > > > > the > > > > > > tasks is very clear > > > > > > - e.g. 1000 tasks from dag1 were scheduled first and then 200 > tasks > > > > > > from dag2 etc. > > > > > > - the executor has parallelism=100 and slots_available=100 which > > means > > > > > > that it can run up to 100 tasks concurrently > > > > > > - max_active_tasks_per_dag is 4 which means that we can have up > to > > 4 > > > > > > tasks running per dag. > > > > > > - For 3 dags, it means that we can run up to 12 tasks at the same > > > > > > time (4 tasks from each dag) > > > > > > - max tis per query are set to 32, meaning that we can examine up > > to 32 > > > > > > scheduled tasks if there are available pool slots > > > > > > > > > > > > If we were to run the scheduler loop repeatedly until it queues > 12 > > > > > > tasks > > > > > > and test the part that examines the scheduled tasks and queues > > them, > > > > > > then > > > > > > > > > > > > - with the query limit > > > > > > - 1 iteration, total time 0.05 > > > > > > - During the iteration > > > > > > - we have parallelism 100, available slots 100 and query limit 32 > > > > > > which means that it will examine up to 32 scheduled tasks > > > > > > - it can queue up to 100 tasks > > > > > > - examines 12 tasks (instead of 32) > > > > > > - 4 tasks from dag1, reached max for the dag > > > > > > - 4 tasks from dag2, reached max for the dag > > > > > > - and 4 tasks from dag3, reached max for the dag > > > > > > - queues 4 from dag1, reaches max for the dag and moves on > > > > > > - queues 4 from dag2, reaches max for the dag and moves on > > > > > > - queues 4 from dag3, reaches max for the dag and moves on > > > > > > - stops queueing because we have reached the maximum per dag, > > > > > > although there are slots for more tasks > > > > > > - iteration finishes > > > > > > - without > > > > > > - 3 iterations, total time 0.29 > > > > > > - During iteration 1 > > > > > > - Examines 32 tasks, all from dag1 (due to FIFO) > > > > > > - queues 4 from dag1 and tries to queue the other 28 but fails > > > > > > - During iteration 2 > > > > > > - examines the next 32 tasks from dag1 > > > > > > - it can't queue any of them because it has reached the max for > > > > > > dag1, since the previous 4 are still running > > > > > > - examines 32 tasks from dag2 > > > > > > - queues 4 from dag2 and tries to queue the other 28 but fails > > > > > > - During iteration 3 > > > > > > - examines the next 32 tasks from dag1, same tasks that were > > > > > > examined in iteration 2 > > > > > > - it can't queue any of them because it has reached the max for > > > > > > dag1 and the first 4 are still running > > > > > > - examines 32 tasks from dag2 , can't queue any of them because > > > > > > it has reached max for dag2 as well > > > > > > - examines 32 tasks from dag3 > > > > > > - queues 4 from dag3 and tries to queue the other 28 but fails > > > > > > > > > > > > I used very low values for all the configs so that I can make the > > point > > > > > > clear and easy to understand. If we increase them, then this > patch > > also > > > > > > makes the task selection more fair and the resource distribution > > more > > > > > > even. > > > > > > > > > > > > I would appreciate it if anyone familiar with the scheduler's > code > > can > > > > > > confirm this and also provide any feedback. > > > > > > > > > > > > Additionally, I have one question regarding the query limit. > > Should it > > > > > > be > > > > > > per dag_run or per dag? I've noticed that > max_active_tasks_per_dag > > has > > > > > > been changed to provide a value per dag_run but the docs haven't > > been > > > > > > updated. > > > > > > > > > > > > Thank you! > > > > > > > > > > > > Regards, > > > > > > Christos Bisias > > > > > > > > --------------------------------------------------------------------- > > > > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org > > > > For additional commands, e-mail: dev-h...@airflow.apache.org > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org > > For additional commands, e-mail: dev-h...@airflow.apache.org > > > > >