> > The configurability was my recommendation for > https://github.com/apache/airflow/pull/53492 > Given the fact that this change is at the heart of Airflow I think the > changes should be experimental where users can switch between different > strategies/modes of the scheduler. > If and when we have enough data to support that specific option is always > better we can make decisions accordingly.
Yeah I guess looking at #53492 <https://github.com/apache/airflow/pull/53492> it does seem too risky to just change the behavior in airflow without releasing it first as experimental. I doubt we can get sufficient real world testing without doing that. So if this is introduced, I think it should just be introduced as experimental optimization. And the intention would be that ultimately there will only be one scheduling mode, and this is just a way to test this out more widely. Not that we are intending to have two scheduling code paths on a permanent basis. WDYT On Mon, Aug 4, 2025 at 12:50 AM Christos Bisias <christos...@gmail.com> wrote: > > > > So my question to you is: is it impossible, or just demanding or > difficult > > to split your Dags into smaller dags connected with asset aware > scheduling? > > > Jarek, I'm going to discuss this with the team and I will get you an answer > on that. > > I've shared this again on the thread > > > https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch > > I haven't created a PR because this is just a POC and it's also setting a > limit per dag. I would like to get feedback on whether it's better to make > it per dag or per dag_run. > I can create a draft PR if that's helpful and makes it easier to add > comments. > > Let me try to explain the issue better. From a high level overview, the > scheduler > > 1. moves tasks to SCHEDULED > 2. runs a query to fetch SCHEDULED tasks from the db > 3. examines the tasks > 4. moves tasks to QUEUED > > I'm focusing on step 2 and afterwards. The current code doesn't take into > account the *max_active_tasks_per_dag*. When it runs the query it fetches > up to *max_tis* which is determined here > < > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L697-L705 > > > . > > For example, > > - if the query number is 32 > - all 32 tasks in line belong to the same dag, *dag1* > - we are not concerned how the scheduler picks them > - *dag1* has *max_active_tasks* set to 5 > > The current code will > > - get 32 tasks from *dag1* > - start examining them one by one > - once 5 are moved to *QUEUED*, it won't stop, it will keep examining > the other 27 but won't be able to queue them because it has reached the > limit > > In the next loop, although we have reached the maximum number of tasks for > *dag1*, the query will fetch again 32 tasks from *dag1* to examine them and > try to queue them. > > The issue is that it gets more tasks than it can queue from the db and then > examines them all. > > This all leads to unnecessary processing that builds up and the more load > there is on the system, the more the throughput drops for the scheduler and > the workers. > > What I'm proposing is to adjust the query in step 2, to check the > *max_active_tasks_per_dag* > > > run a query to fetch SCHEDULED tasks from the db > > > If a dag has already reached the maximum number of tasks in active states, > it will be skipped by the query. > > Don't we already stop examining at that point? I guess there's two things > > you might be referring to. One is, which TIs come out of the db and into > > python, and the other is, what we do in python. Just might be helpful to > > be clear about the specific enhancements & changes you are making. > > > I think that if we adjust the query and fetch the right number of tasks, > then we won't have to make changes to what is done in python. > > > > > On Mon, Aug 4, 2025 at 8:01 AM Daniel Standish > <daniel.stand...@astronomer.io.invalid> wrote: > > > @Christos Bisias > > > > If you have a very large dag, and its tasks have been scheduled, then the > > > scheduler will keep examining the tasks for queueing, even if it has > > > reached the maximum number of active tasks for that particular dag. > Once > > > that fails, then it will move on to examine the scheduled tasks of the > > next > > > dag or dag_run in line. > > > > > > Can you make this a little more precise? There's some protection against > > "starvation" i.e. dag runs recently considered should go to the back of > the > > line next time. > > > > Maybe you could clarify why / how that's not working / not optimal / how > to > > improve. > > > > If there are available slots in the pool and > > > the max parallelism hasn't been reached yet, then the scheduler should > > stop > > > processing a dag that has already reached its max capacity of active > > tasks. > > > > > > If a dag run (or dag) is already at max capacity, it doesn't really > matter > > if there are slots available or parallelism isn't reached -- shouldn't it > > stop anyway? > > > > In addition, the number of scheduled tasks picked for examining, should > be > > > capped at the number of max active tasks if that's lower than the query > > > limit. If the active limit is 10 and we already have 5 running, then we > > can > > > queue at most 5 tasks. In that case, we shouldn't examine more than > that. > > > > > > Don't we already stop examining at that point? I guess there's two > things > > you might be referring to. One is, which TIs come out of the db and into > > python, and the other is, what we do in python. Just might be helpful to > > be clear about the specific enhancements & changes you are making. > > > > There is already a patch with the changes mentioned above. IMO, these > > > changes should be enabled/disabled with a config flag and not by > default > > > because not everyone has the same needs as us. In our testing, adding a > > > limit on the tasks retrieved from the db requires more processing on > the > > > query which actually makes things worse when you have multiple small > > dags. > > > > > > I would like to see a stronger case made for configurability. Why make > it > > configurable? If the performance is always better, it should not be made > > configurable. Unless it's merely released as an opt-in experimental > > feature. If it is worse in some profiles, let's be clear about that. > > > > I did not read anything after `Here is a simple test case that makes the > > benefits of the improvements > > noticeable` because, it seemed rather long winded detail about a test > > case. A higher level summary might be helpful to your audience. Is > there > > a PR with your optimization. You wrote "there is a patch" but did not, > > unless I miss something, share it. I would take a look if you share it > > though. > > > > Thanks > > > > > > > > > > > > > > > > On Sun, Aug 3, 2025 at 5:08 PM Daniel Standish < > > daniel.stand...@astronomer.io> wrote: > > > > > Yes Ui is another part of this. > > > > > > At some point the grid and graph views completely stop making sense for > > > that volume, and another type of view would be required both for > > usability > > > and performance > > > > > > > > > > > > On Sun, Aug 3, 2025 at 11:04 AM Jens Scheffler > > <j_scheff...@gmx.de.invalid> > > > wrote: > > > > > >> Hi, > > >> > > >> We also have a current demand to have a workflow to execute 10k to > 100k > > >> tasks. Together with @AutomationDev85 we are working on a local > solution > > >> because we also saw problems in the Scheduler that are not linearly > > >> scaling. And for sure not easy to be fixed. But from our investigation > > >> also there are other problems to be considered like UI will also > > >> potentially have problems. > > >> > > >> I am a bit sceptic that PR 49160 completely fixes the problems > mentioned > > >> here and made some comments. I do not want to stop enthusiasm to fix > and > > >> improve things but the Scheduler is quite complex and changed need to > be > > >> made with care. > > >> > > >> Actually I like the patch > > >> > > >> > > > https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch > > >> as it just adds some limit preventing scheduler to focus on only one > > >> run. But complexity is a bit big for a "patch" :-D > > >> > > >> I'd also propose atm the way that Jarek described and split-up the Dag > > >> into multiple parts (divide and conquer) for the moment. > > >> > > >> Otherwise if there is a concrete demand on such large Dags... we maybe > > >> need rather a broader initiative if we want to ensure 10k, 100k, 1M? > > >> tasks are supported per Dag. Because depending on the magnitude we > > >> strive for different approaches are needed. > > >> > > >> Jens > > >> > > >> On 03.08.25 16:33, Daniel Standish wrote: > > >> > Definitely an area of the scheduler with some opportunity for > > >> performance > > >> > improvement. > > >> > > > >> > I would just mention that, you should also attempt to include some > > >> > performance testing at load / scale because, window functions are > > going > > >> to > > >> > be more expensive. > > >> > > > >> > What happens when you have many dags, many historical dag runs & > TIs, > > >> lots > > >> > of stuff running concurrently. You need to be mindful of the > overall > > >> > impact of such a change, and not look only at the time spent on > > >> scheduling > > >> > this particular dag. > > >> > > > >> > I did not look at the PRs yet, maybe you've covered this, but, it's > > >> > important. > > >> > > > >> > > > >> > On Sun, Aug 3, 2025 at 5:57 AM Christos Bisias< > christos...@gmail.com> > > >> > wrote: > > >> > > > >> >> I'm going to review the PR code and test it more thoroughly before > > >> leaving > > >> >> a comment. > > >> >> > > >> >> This is my code for reference > > >> >> > > >> >> > > >> >> > > >> > > > https://github.com/xBis7/airflow/compare/69ab304ffa3d9b847b7dd0ee90ee6ef100223d66..scheduler-perf-patch > > >> >> > > >> >> The current version is setting a limit per dag, across all > dag_runs. > > >> >> > > >> >> Please correct me if I'm wrong, but the PR looks like it's changing > > >> the way > > >> >> that tasks are prioritized to avoid starvation. If that's the case, > > >> I'm not > > >> >> sure that this is the same issue. My proposal is that, if we have > > >> reached > > >> >> the max resources assigned to a dag, then stop processing its tasks > > and > > >> >> move on to the next one. I'm not changing how or which tasks are > > >> picked. > > >> >> > > >> >> On Sun, Aug 3, 2025 at 3:23 PM asquator<asqua...@proton.me > .invalid> > > >> >> wrote: > > >> >> > > >> >>> Thank you for the feedback. > > >> >>> Please, describe the case with failing limit checks in the PR > (DAG's > > >> >>> parameters and it's tasks' parameters and what fails to be > checked) > > >> and > > >> >>> we'll try to fix it ASAP before you can test it again. Let's > > continue > > >> the > > >> >>> PR-related discussion in the PR itself. > > >> >>> > > >> >>> On Sunday, August 3rd, 2025 at 2:21 PM, Christos Bisias < > > >> >>> christos...@gmail.com> wrote: > > >> >>> > > >> >>>> Thank you for bringing this PR to my attention. > > >> >>>> > > >> >>>> I haven't studied the code but I ran a quick test on the branch > and > > >> >> this > > >> >>>> completely ignores the limit on scheduled tasks per dag or > dag_run. > > >> It > > >> >>>> grabbed 70 tasks from the first dag and then moved all 70 to > QUEUED > > >> >>> without > > >> >>>> any further checks. > > >> >>>> > > >> >>>> This is how I tested it > > >> >>>> > > >> >> > > >> > > > https://github.com/Asquator/airflow/compare/feature/pessimistic-task-fetching-with-window-function...xBis7:airflow:scheduler-window-function-testing?expand=1 > > >> >>>> On Sun, Aug 3, 2025 at 1:44 PM asquatorasqua...@proton.me > .invalid > > >> >>> wrote: > > >> >>>>> Hello, > > >> >>>>> > > >> >>>>> This is a known issue stemming from the optimistic scheduling > > >> >> strategy > > >> >>>>> used in Airflow. We do address this in the above-mentioned PR. I > > >> want > > >> >>> to > > >> >>>>> note that there are many cases where this problem may appear—it > > was > > >> >>>>> originally detected with pools, but we are striving to fix it in > > all > > >> >>> cases, > > >> >>>>> such as the one described here with max_active_tis_per_dag, by > > >> >>> switching to > > >> >>>>> pessimistic scheduling with SQL window functions. While the > > current > > >> >>>>> strategy simply pulls the max_tis tasks and drops the ones that > do > > >> >> not > > >> >>> meet > > >> >>>>> the constraints, the new strategy will pull only the tasks that > > are > > >> >>>>> actually ready to be scheduled and that comply with all > > concurrency > > >> >>> limits. > > >> >>>>> It would be very helpful for pushing this change to production > if > > >> you > > >> >>>>> could assist us in alpha-testing it. > > >> >>>>> > > >> >>>>> See also: > > >> >>>>> https://github.com/apache/airflow/discussions/49160 > > >> >>>>> > > >> >>>>> Sent with Proton Mail secure email. > > >> >>>>> > > >> >>>>> On Sunday, August 3rd, 2025 at 12:59 PM, Elad Kalif > > >> >> elad...@apache.org > > >> >>>>> wrote: > > >> >>>>> > > >> >>>>>> i think most of your issues will be addressed by > > >> >>>>>> https://github.com/apache/airflow/pull/53492 > > >> >>>>>> The PR code can be tested with Breeze so you can set it up and > > see > > >> >>> if it > > >> >>>>>> solves the problem this will also help with confirming it's the > > >> >> right > > >> >>>>>> fix. > > >> >>>>>> > > >> >>>>>> On Sun, Aug 3, 2025 at 10:46 AM Christos Bisias > > >> >>> christos...@gmail.com > > >> >>>>>> wrote: > > >> >>>>>> > > >> >>>>>>> Hello, > > >> >>>>>>> > > >> >>>>>>> The scheduler is very efficient when running a large amount of > > >> >> dags > > >> >>>>>>> with up > > >> >>>>>>> to 1000 tasks each. But in our case, we have dags with as many > > as > > >> >>>>>>> 10.000 > > >> >>>>>>> tasks. And in that scenario the scheduler and worker > throughput > > >> >>> drops > > >> >>>>>>> significantly. Even if you have 1 such large dag with > scheduled > > >> >>> tasks, > > >> >>>>>>> the > > >> >>>>>>> performance hit becomes noticeable. > > >> >>>>>>> > > >> >>>>>>> We did some digging and we found that the issue comes from the > > >> >>>>>>> scheduler's > > >> >>>>>>> _executable_task_instances_to_queued > > >> >>>>>>> < > > >> >>>>> > > >> >> > > >> > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L293C9-L647 > > >> >>>>>>> method. > > >> >>>>>>> In particular with the db query here > > >> >>>>>>> < > > >> >>>>> > > >> >> > > >> > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L364-L375 > > >> >>>>>>> and > > >> >>>>>>> examining the results here > > >> >>>>>>> < > > >> >>>>> > > >> >> > > >> > > > https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/jobs/scheduler_job_runner.py#L425 > > >> >>>>>>> . > > >> >>>>>>> > > >> >>>>>>> If you have a very large dag, and its tasks have been > scheduled, > > >> >>> then > > >> >>>>>>> the > > >> >>>>>>> scheduler will keep examining the tasks for queueing, even if > it > > >> >>> has > > >> >>>>>>> reached the maximum number of active tasks for that particular > > >> >> dag. > > >> >>>>>>> Once > > >> >>>>>>> that fails, then it will move on to examine the scheduled > tasks > > >> >> of > > >> >>> the > > >> >>>>>>> next > > >> >>>>>>> dag or dag_run in line. > > >> >>>>>>> > > >> >>>>>>> This is inefficient and causes the throughput of the scheduler > > >> >> and > > >> >>> the > > >> >>>>>>> workers to drop significantly. If there are available slots in > > >> >> the > > >> >>>>>>> pool and > > >> >>>>>>> the max parallelism hasn't been reached yet, then the > scheduler > > >> >>> should > > >> >>>>>>> stop > > >> >>>>>>> processing a dag that has already reached its max capacity of > > >> >>> active > > >> >>>>>>> tasks. > > >> >>>>>>> > > >> >>>>>>> In addition, the number of scheduled tasks picked for > examining, > > >> >>>>>>> should be > > >> >>>>>>> capped at the number of max active tasks if that's lower than > > the > > >> >>> query > > >> >>>>>>> limit. If the active limit is 10 and we already have 5 > running, > > >> >>> then > > >> >>>>>>> we can > > >> >>>>>>> queue at most 5 tasks. In that case, we shouldn't examine more > > >> >> than > > >> >>>>>>> that. > > >> >>>>>>> > > >> >>>>>>> There is already a patch with the changes mentioned above. > IMO, > > >> >>> these > > >> >>>>>>> changes should be enabled/disabled with a config flag and not > by > > >> >>>>>>> default > > >> >>>>>>> because not everyone has the same needs as us. In our testing, > > >> >>> adding a > > >> >>>>>>> limit on the tasks retrieved from the db requires more > > processing > > >> >>> on > > >> >>>>>>> the > > >> >>>>>>> query which actually makes things worse when you have multiple > > >> >>> small > > >> >>>>>>> dags. > > >> >>>>>>> > > >> >>>>>>> Here is a simple test case that makes the benefits of the > > >> >>> improvements > > >> >>>>>>> noticeable > > >> >>>>>>> > > >> >>>>>>> - we have 3 dags with thousands of tasks each > > >> >>>>>>> - for simplicity let's have 1 dag_run per dag > > >> >>>>>>> - triggering them takes some time and due to that, the FIFO > > order > > >> >>> of > > >> >>>>>>> the > > >> >>>>>>> tasks is very clear > > >> >>>>>>> - e.g. 1000 tasks from dag1 were scheduled first and then 200 > > >> >> tasks > > >> >>>>>>> from dag2 etc. > > >> >>>>>>> - the executor has parallelism=100 and slots_available=100 > which > > >> >>> means > > >> >>>>>>> that it can run up to 100 tasks concurrently > > >> >>>>>>> - max_active_tasks_per_dag is 4 which means that we can have > up > > >> >> to > > >> >>> 4 > > >> >>>>>>> tasks running per dag. > > >> >>>>>>> - For 3 dags, it means that we can run up to 12 tasks at the > > same > > >> >>>>>>> time (4 tasks from each dag) > > >> >>>>>>> - max tis per query are set to 32, meaning that we can examine > > up > > >> >>> to 32 > > >> >>>>>>> scheduled tasks if there are available pool slots > > >> >>>>>>> > > >> >>>>>>> If we were to run the scheduler loop repeatedly until it > queues > > >> >> 12 > > >> >>>>>>> tasks > > >> >>>>>>> and test the part that examines the scheduled tasks and queues > > >> >>> them, > > >> >>>>>>> then > > >> >>>>>>> > > >> >>>>>>> - with the query limit > > >> >>>>>>> - 1 iteration, total time 0.05 > > >> >>>>>>> - During the iteration > > >> >>>>>>> - we have parallelism 100, available slots 100 and query limit > > 32 > > >> >>>>>>> which means that it will examine up to 32 scheduled tasks > > >> >>>>>>> - it can queue up to 100 tasks > > >> >>>>>>> - examines 12 tasks (instead of 32) > > >> >>>>>>> - 4 tasks from dag1, reached max for the dag > > >> >>>>>>> - 4 tasks from dag2, reached max for the dag > > >> >>>>>>> - and 4 tasks from dag3, reached max for the dag > > >> >>>>>>> - queues 4 from dag1, reaches max for the dag and moves on > > >> >>>>>>> - queues 4 from dag2, reaches max for the dag and moves on > > >> >>>>>>> - queues 4 from dag3, reaches max for the dag and moves on > > >> >>>>>>> - stops queueing because we have reached the maximum per dag, > > >> >>>>>>> although there are slots for more tasks > > >> >>>>>>> - iteration finishes > > >> >>>>>>> - without > > >> >>>>>>> - 3 iterations, total time 0.29 > > >> >>>>>>> - During iteration 1 > > >> >>>>>>> - Examines 32 tasks, all from dag1 (due to FIFO) > > >> >>>>>>> - queues 4 from dag1 and tries to queue the other 28 but fails > > >> >>>>>>> - During iteration 2 > > >> >>>>>>> - examines the next 32 tasks from dag1 > > >> >>>>>>> - it can't queue any of them because it has reached the max > for > > >> >>>>>>> dag1, since the previous 4 are still running > > >> >>>>>>> - examines 32 tasks from dag2 > > >> >>>>>>> - queues 4 from dag2 and tries to queue the other 28 but fails > > >> >>>>>>> - During iteration 3 > > >> >>>>>>> - examines the next 32 tasks from dag1, same tasks that were > > >> >>>>>>> examined in iteration 2 > > >> >>>>>>> - it can't queue any of them because it has reached the max > for > > >> >>>>>>> dag1 and the first 4 are still running > > >> >>>>>>> - examines 32 tasks from dag2 , can't queue any of them > because > > >> >>>>>>> it has reached max for dag2 as well > > >> >>>>>>> - examines 32 tasks from dag3 > > >> >>>>>>> - queues 4 from dag3 and tries to queue the other 28 but fails > > >> >>>>>>> > > >> >>>>>>> I used very low values for all the configs so that I can make > > the > > >> >>> point > > >> >>>>>>> clear and easy to understand. If we increase them, then this > > >> >> patch > > >> >>> also > > >> >>>>>>> makes the task selection more fair and the resource > distribution > > >> >>> more > > >> >>>>>>> even. > > >> >>>>>>> > > >> >>>>>>> I would appreciate it if anyone familiar with the scheduler's > > >> >> code > > >> >>> can > > >> >>>>>>> confirm this and also provide any feedback. > > >> >>>>>>> > > >> >>>>>>> Additionally, I have one question regarding the query limit. > > >> >>> Should it > > >> >>>>>>> be > > >> >>>>>>> per dag_run or per dag? I've noticed that > > >> >> max_active_tasks_per_dag > > >> >>> has > > >> >>>>>>> been changed to provide a value per dag_run but the docs > haven't > > >> >>> been > > >> >>>>>>> updated. > > >> >>>>>>> > > >> >>>>>>> Thank you! > > >> >>>>>>> > > >> >>>>>>> Regards, > > >> >>>>>>> Christos Bisias > > >> >>>>> > > >> --------------------------------------------------------------------- > > >> >>>>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org > > >> >>>>> For additional commands, e-mail:dev-h...@airflow.apache.org > > >> >>> > > --------------------------------------------------------------------- > > >> >>> To unsubscribe, e-mail:dev-unsubscr...@airflow.apache.org > > >> >>> For additional commands, e-mail:dev-h...@airflow.apache.org > > >> >>> > > >> >>> > > >> > > > > > >