> ...sorry need to dis-agree: We were under fire as of a lot of Maybe you misunderstood me, sorry that I was not clear. It was not about your hotfix and I absolutely do not deny that Airflow stability and compatibility issues are the root of your problems. I was more focused on the KPO modification you made previously. Sorry if it came across that way—I know you were under pressure, and this wasn't ideal. Yes, improving Ariflow 3.2 and future stability is a collective responsibility, not solely yours.
And if anything that should be a lesson for us that we likely overdid it in terms of throwing out some features and behaviours that made our code slightly better, and our maintenance slightly easier - at the expense of big pain for our users, getting close to the point that they question if Airflow is good for us. Personally, I think we traded too many maintainer conveniences for user pain, and we should learn from that in our future decisions. I was merely referring to the KPO modifications you made previously—those caused additional rework when a "good" solution was proposed and will be implemented. Of course it might mean - that our KPO is badly designed - not following the "Open-Closed" principle https://en.wikipedia.org/wiki/Open%E2%80%93closed_principle = "open for extensions, closed for modification" And of course - with the "fire" you're experiencing, it might feel nearly suicidal to raise such issues now with your management. Alternatively, it might genuinely be an issue on our side, or perhaps—as usual—the truth lies somewhere in between. We should make it clearer to our users that we welcome their feedback when our designs make it difficult for them to extend Airflow in ways that benefit them. So - sorry if my comment came in the way that made you feel this way - that was certainkly not an intention. I absolutely understand and recognise your pains - and I think we all should, and in the future, we must prioritize compatibility and behavioural changes as an even more important topic for discussion. We need to have those discussions and not rush them, because doing so makes our lives easier. J. On Wed, Mar 25, 2026 at 11:53 PM Jens Scheffler <[email protected]> wrote: > Hi Jarek, > > regarding: > > > And sorry for the rework, but this is pretty expected that such > > customisation will need updating now and then. But then you have a nice > > case for Bosch: "Look, this is what happens, when we do not upstream our > > changes and do not do it in the way that is extendable and "product" > wise. > > So next time let's spend a bit more time when we customize things to make > > upstreamable and do not double-spend on it. You will have a nice case to > > show as an example (just saying - and I know it's not that easy - trying > to > > find the bright side of it). > > ...sorry need to dis-agree: We were under fire as of a lot of > in-stabilities and this problem was the tip of the iceberg. We needed to > improve radical in a short timeframe. No time to mave a multi-week > discussion. Our house was on fire. And as initially described we > considered multiple options and for us (short term) we went the route > which was not complex / minimal invasive. > > I think discussiosns are toally fair and the important hing are the > reasons - which are given as good opinions that I accept. It is not > about the rework but as of the feedback I assume many are suffering from > this. Willr aise a VOTE hoping that the needed groundwork as enablement > can be added to 3.2.0 such that the hotfix we needed to make does not > need to be applied to all other installs suffering from the problem and > it an be resolved with 3.2.0 > > On 25.03.26 00:39, Jarek Potiuk wrote: > > Jens: > > > > I think that looks way more reasonable for now. I would leave detailed > > review to those who know that part better - but from the logical > > /architectural point of view, this is way more "Airflow-y" > > > > And sorry for the rework, but this is pretty expected that such > > customisation will need updating now and then. But then you have a nice > > case for Bosch: "Look, this is what happens, when we do not upstream our > > changes and do not do it in the way that is extendable and "product" > wise. > > So next time let's spend a bit more time when we customize things to make > > upstreamable and do not double-spend on it. You will have a nice case to > > show as an example (just saying - and I know it's not that easy - trying > to > > find the bright side of it). > > > > Przemysław: > > > >> 1. > > Modification of priority weight to have a greater value for tasks which > are > > running (task has higher priority by design if it is an deferred task and > > was already running in the past) > > > > We had **LOTS** of talks and even AIP > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-100+Eliminate+Scheduler+Starvation+On+Concurrency+Limits?focusedCommentId=406619934 > > discussing options - I think after 3.3 one of the things to think about > is > > to lead that one to completion and redesign the priority weighs in > general > > finally - because it already reached it limits. > > > >> 2. Doing something like pre-emption from Operating Systems, but on task, > > which would mean that if higher priority task is waiting and lower > priority > > task is running, higher priority task can put lower one to sleep and take > > its slot > > > > Yes. IMHO preemption is the only way to actually deal with some of the > > scenarios. Logically, you cannot achieve both optimal machine utilization > > and optimal latency for task chain execution without preemption. And even > > with preemption - you achieve it by taking a (smaller) hit on > utilization. > > Instead of reserving free slots for potential high-priority workloads, > you > > decide to kill some in-progress tasks to run high-priority ones - thus > > lowering utilization due to all the already partially run but preempted > > tasks. This is really nice if you have rare spikes in traffic, because > > utilization only takes a hit when there is a high load. Reserving slots > in > > any way (queues, etc.) causes a utilization hit, except during spikes. > > > > But it's not something we necessarily need to handle ourselves. Perhaps > > this is a sign that we could use our "Airflow as a platform" approach to > > delegate some of those tasks out long term. While Airflow is good in > > scheduling and conditional logic of workflows of various provenance > (which > > is what Airflow actually excels at and this is our bread and butter). But > > airflow had never been into optimizing of utilisation. And Yuvicorn is > > designed for it (While it does not handle complex logic) - so the combo > is > > something I've always thought is a good idea. (and the good news is that > > there is a rumour at the Airflow summit we will have a really nice talk > > about it from one of our PMC member whos team already does it at scale in > > one of the biggest companies in the word - just rumours ...) > > > > J. > > > > > > On Tue, Mar 24, 2026 at 11:37 PM Przemysław Mirowski <[email protected]> > > wrote: > > > >> Hi all, > >> > >> Thank you Jens for bringing up this topic as I would say it is not an > >> niche issue in higher load Airflow deployments which are using Deferred > >> Operators. > >> > >> I haven't checked, in details, all PRs created regarding this topic, > but I > >> thought that I will share my 2c. > >> > >> From my perspective, the issue is affecting any setup which is using > >> Triggerer as mentioned by André in the past. I'm not sure if there is > ideal > >> solution to this issue, so probably we could do a "patch" thing until > the > >> proper one will be designed, implemented and tested. > >> > >> One idea which I haven't seen on the devlist yet could be: > >> > >> 1. > >> Modification of priority weight to have a greater value for tasks which > >> are running (task has higher priority by design if it is an deferred > task > >> and was already running in the past) > >> 2. > >> Doing something like pre-emption from Operating Systems, but on task, > >> which would mean that if higher priority task is waiting and lower > priority > >> task is running, higher priority task can put lower one to sleep and > take > >> its slot > >> > >> Of course, doing step 2 would require design changes and a smart way of > >> stopping tasks without breaking their operations (probably per operator > >> implementation something like on_kill method), but at the end it could > >> possibly resolve an issue. That approach could lead to starvation on the > >> lower priority tasks, but as they are lower priority, it should not be a > >> huge issue. > >> > >> Regarding the different options - I would not really be in favour of > doing > >> something which would be next to some different logic with exclusion of > any > >> limit. In high workload environment that could lead to e.g. higher > resource > >> consumption on particular components (which would be hard to predict) > which > >> could lead to stability/performance issues. > >> > >> Przemek > >> > >> ________________________________ > >>> From: Jens Scheffler <[email protected]> > >>> Sent: 22 March 2026 18:25 > >>> To: [email protected] <[email protected]> > >>> Subject: Re: [DISCUSS] PR: Option to directly submit to worker queue > >> from Triggerer > >>> Hi Airflow Dev's, > >>> > >>> According to the feedback and a bit of Claude I have made two PRs which > >>> would resolve the issue at least for KPO and only for cases where no > >>> callback is needed: > >>> > >>> * https://github.com/apache/airflow/pull/64068 - Feature in Core to > >>> support XCom results in Triggerer > >>> * https://github.com/apache/airflow/pull/64069 - Implement XCom > push > >>> from Triggerer in KPO > >>> > >>> I'd be VERY happy if discussion gets in a path that one or the other > >>> solution gets into 3.2.0 (else we will need to locally patch for > another > >>> months...) > >>> > >>> Compared to the initial proposals this is not adding any executor > >>> coupling to Triggerer but with the two PRs only applies to KPO and only > >>> if no callbacks are needed. > >>> > >>> For us personally the alternative is a bit harder as we subclassed KPO > >>> and all additional features then need to be re-implemented in a > >> triggerer. > >>> All other operators in a similar fashion (KubernetesJobOperator or all > >>> other Deferred wo return from Triggerer to Worker (could only fing AWS > >>> EKS and Google GKE but there mgiht be more around) would need similar > >>> extension/changes. And all users depending on callbacks are also still > >>> on the same problem. > >>> > >>> Jens > >>> > >>> On 21.03.26 23:22, Jarek Potiuk wrote: > >>>> The idea to put all the closing into KPO would mean this is then only > a > >>> solution in KPO cases, all other tasks related to Triggerer > >>> re-scheduling would still suffer from the same problem. So it would be > a > >>> point solution. > >>> > >>> All other cases do not suffer from resources taken by the KPO pod. I > >> think > >>> the case you described is **really** KPO bound and the real problem is > >> not > >>> that "next" method has lower priority. I believe that executing > >>> higher-priority tasks before lower-priority "next" tasks, after the > >> trigger > >>> completes, is exactly how Airflow is designed. In this case I think the > >>> problem is that we are using the "next" deferrable method in KPOTrigger > >> to > >>> do something that was **never** supposed to run as "next." A side > effect > >> is > >>> that the "pod" must be kept for much longer than necessary just to > >> extract > >>> the Xcom. That, to be honest, makes very little sense; it seems like > >> using > >>> a screwdriver to put in nails instead of a hammer. > >>> > >>> If the "next" method is only supposed to retrieve the XCom and save it > to > >>> the database, there is absolutely no reason to have a scheduled task > for > >>> it—this is, in my humble opinion (IMHO), the biggest problem to solve. > Do > >>> you think there is a good reason to use the sidecar volume to store > XCom > >>> data in this case, rather than Airflow's regular storage for XCom? For > me > >>> that looks like a bad idea where we essntially uise Pod's side-car to > >> store > >>> a value in some kind of intermediate storage, while there is nothing to > >>> prevent us to store it in the actual "target" storage. > >>> > >>>> But I think this is also one of the shortcomings that @dabla mentioned > >>> multiple times when he was starting to contribute the Async Iterator - > I > >>> see no API endpoint in triggerer code (see e.g. > >>> > >> > providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py) > >>> where the current API has an option to get a DB, supervisor or XCom > >>> context. Maybe I am blind not seeing this but I fear this is just > >>> missing and would be a larger enablement in the core for Triggerer to > >>> have such functions that in regular tasks executed by SDK have as > >>> context. Also there is no `context` object which providers this... but > >>> happy if you can enlighten me where and how code in Triggerer would be > >>> able to write an XCom if today available. > >>> > >>> We can add it if it's missing; that's not an issue. It's far less a > >>> disruptive chance than triggering communication with other components. > >> This > >>> is a code change rather than an architectural change, which is always > >>> easier for Airflow users to implement (you just upgrade the Airflow > >> version > >>> - no other changes in deployment are needed. > >>> > >>> > >>>> In our case it is also a bit more complex - but would be fair to tell > it > >>> is then a Bosch only problem - we have inherited a custom operator on > >>> top of KPO to cover specific error triaging which e.g. marks task > failed > >>> w/o retry if we see there is a deterministic error (like > >>> ImagePullBackOff - we know is not there we do not want to retry - or > >>> also for cases where our workload fails in problems we know are > >>> deterministic and a retry would be just waste of time and resources > like > >>> an assert in the application we are testing). If all that logic is > moved > >>> to triggerer it will in-deed be a bit harder as you know deploying > >>> changes to triggerer is less flexible than some operator code in Dag > >>> source tree. But we can also make this... with more time invested. But > >>> XCom need to be resolved. > >>> > >>> Yes, I understand that - but IMHO this is building on a flaw in the > >> current > >>> implementation (which was never intended to be extended). And in this > >> case > >>> as I understand it, that would involve adding the same functionality to > >>> KPOTrigger, not KPO itself. Yes. Breaking chance for you, but I think > >>> overall a better future for Airflow maintainability. With this change, > I > >>> think the optimisation is even better than patching the behavior of the > >>> current KPO. And if needs be - we can even add some "extension points" > >> and > >>> the ability to use a custom KPOTrigger rather than a custom KPO. I > don't > >>> think this is a major change (even if it is somewhat more complex for > >> some > >>> users - like Bosch). However, with this you can get even better > >> performance > >>> than with the "hack" using an executor because you will entirely skip > >>> re-entry and the need to start a new pod **just** to pull the XCom and > >> save > >>> it in the database. That sounds like an overall win-win. > >>> > >>> That might be a major change (breaking, requiring a significant > >>> info/changelog for KPO) - reqiuiring new version of Airflow to provide > >> the > >>> API that is possibly missing in 3.1.0 - but IMHO absolutlely worth > doing > >> - > >>> coul be also nicely conditioned by version of Airflow cncf.Kubernetes > is > >>> installed on. > >>> > >>> I thinnk while slightly more complex for Bosh to pull-off, it might be > >>> better for Airflow as a product. > >>> > >>> J. > >>> > >>> > >>> > >>> On Sat, Mar 21, 2026 at 10:49 PM Jens Scheffler<[email protected]> > >> wrote: > >>>> Hi Jarek, et al. > >>>> > >>>> You are not missing any obvious and I understand the rationale of > >>>> coupling is what your concern is. > >>>> > >>>> So your counter proposal would get back to the point I had in my list > of > >>>> alternatives with the name "Finish-up the work on triggerer w/o return > >>>> to Worker". That had the ending with "Main blocker in this view is > XCom > >>>> access." > >>>> > >>>> The idea to put all the closing into KPO would mean this is then only > a > >>>> solution in KPO cases, all other tasks related to Triggerer > >>>> re-scheduling would still suffer from the same problem. So it would > be a > >>>> point solution. > >>>> > >>>> But I think this is also one of the shortcomings that @dabla mentioned > >>>> multiple times when he was starting to contribute the Async Iterator > - I > >>>> see no API endpoint in triggerer code (see e.g. > >>>> > >> > providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py) > >>>> where the current API has an option to get a DB, supervisor or XCom > >>>> context. Maybe I am blind not seeing this but I fear this is just > >>>> missing and would be a larger enablement in the core for Triggerer to > >>>> have such functions that in regular tasks executed by SDK have as > >>>> context. Also there is no `context` object which providers this... but > >>>> happy if you can enlighten me where and how code in Triggerer would be > >>>> able to write an XCom if today available. > >>>> > >>>> In our case it is also a bit more complex - but would be fair to tell > it > >>>> is then a Bosch only problem - we have inherited a custom operator on > >>>> top of KPO to cover specific error triaging which e.g. marks task > failed > >>>> w/o retry if we see there is a deterministic error (like > >>>> ImagePullBackOff - we know is not there we do not want to retry - or > >>>> also for cases where our workload fails in problems we know are > >>>> deterministic and a retry would be just waste of time and resources > like > >>>> an assert in the application we are testing). If all that logic is > moved > >>>> to triggerer it will in-deed be a bit harder as you know deploying > >>>> changes to triggerer is less flexible than some operator code in Dag > >>>> source tree. But we can also make this... with more time invested. But > >>>> XCom need to be resolved. > >>>> > >>>> Jens > >>>> > >>>> On 21.03.26 15:56, Jarek Potiuk wrote: > >>>>> It took me a bit time - I wanted to go a bit deeper and look closer > >> aht e > >>>>> I also share a bit of Niko's concerns. Running executor code from the > >>>>> Triggerer significantly changes the architectural approach and > >>>> boundaries. > >>>>> And yeah - it's not really tied to multi-team, it's a general issue > (in > >>>>> multi-team triggerer is per team, so you can be sure that "what is in > >>>> team, > >>>>> stays in team"). > >>>>> > >>>>> But it does change which system components communicate with each > other. > >>>> For > >>>>> example, the Triggerer needs access to the Celery queue for Celery > >>>>> Executor, which is otherwise unnecessary. Similarly, the Triggerer > will > >>>>> need access to start Pods if the K8S executor is used. I'm not even > >> sure > >>>>> what would happen with the Edge executor. From what I understand, the > >>>> Edge > >>>>> Executor actively loops and checks for tasks, purges jobs etc. It > pulls > >>>>> data from the database so I can imagine it simply enqueues the > workload > >>>> and > >>>>> then the "real" Edge executor takes over (I believe that will be the > >>>> case). > >>>>> However, this significantly crosses the current boundaries of which > >>>>> component does what. > >>>>> > >>>>> Not mentioning the secutity implication - the JWT token generator in > >> your > >>>>> solution works on triggerer, and that's pretty much breaks the > (future) > >>>>> assumption that Triggerer does not need to know the secret necessary > to > >>>>> generate the token - and has a lot implications (for multi-team for > >>>> example > >>>>> - but not only) - generally a lot of the future task isolation work > is > >>>>> based that the user code has no chances to see the secret to generate > >> the > >>>>> tokens. > >>>>> > >>>>> While I think this is a good temporary solution for you, I understand > >> the > >>>>> use case and its merit; it's not a niche one. Pretty much anyone with > >> KPO > >>>>> and lots of deferred tasks will have this case. But I've been > thinking > >>>>> outside of the box actually. I am not sure if I'm right, but this > seems > >>>> to > >>>>> *really* be a problem with how KPO's Xcom passing is done via the > >> sidecar > >>>>> for this Triggerer -> next() handover. But it does not have to be > this > >>>> way. > >>>>> Conceptually, I see absolutely no problem saving the XCom where it > >>>> belongs: > >>>>> either the DB or XCom Backend. This operation is almost exclusively > >>>>> I/O-bound, so it can easily be done asynchronously. It could be done > in > >>>>> KPOTrigger via the supervising process (which has DB access) instead > of > >>>>> passing things back for scheduling. If KPOTriggerer sees that the Pod > >> is > >>>>> completed it could simply ask the supervisor to perform all the tasks > >>>>> currently done in KPO's `trigger_reentry`. And that it would not even > >>>> cause > >>>>> any re-entry, KPOTriggerer could wait for the supervisor to perform > the > >>>>> action and write to the XCom database or XCom backend, and would > simply > >>>>> "complete" the task. Triggerer could even have a separate event loop > >> for > >>>>> such "finalization," and all of it could be done asynchronously to > >> scale > >>>>> things. > >>>>> > >>>>> There would be no "reentry" workload to run at all, because all > >>>> completion > >>>>> could happen in the Triggerer. And I think the Triggerer, similar to > >> the > >>>>> worker, can communicate with all components. It has access to the DB > >>>>> (through triggerer supervisor process) and can also communicate with > >> the > >>>>> XCom backend (you should be able to perform XCom Push from the > >> Triggerer > >>>>> supervisor (not necessarily from the event loop process). Aside > >> probably > >>>>> much more notwork traffic for the Triggerer to save/retrieve XComs > from > >>>>> multiple deferred KPOs. I can't see any problem with it ( and it can > be > >>>>> nicely scaled out by adding more triggerers) > >>>>> > >>>>> Or am I missing something obvious ? > >>>>> > >>>>> J. > >>>>> > >>>>> > >>>>> On Sat, Mar 21, 2026 at 12:17 PM Jens Scheffler<[email protected]> > >>>> wrote: > >>>>>> Hey Niko, > >>>>>> > >>>>>> thanks as well for the response with reasoning. I understand that > you > >> do > >>>>>> not prefer a coupling for a +0. > >>>>>> > >>>>>> Is there any other option (hint: some where listed "Options we > >>>>>> considered") that you would propose as better solution? Or is the +0 > >>>>>> just the "least of worse"? > >>>>>> > >>>>>> Jens > >>>>>> > >>>>>> On 20.03.26 02:03, Oliveira, Niko wrote: > >>>>>>> Hey Jens et al., > >>>>>>> > >>>>>>> That is three datapoints now so there is indeed some demand. Not > sure > >>>> if > >>>>>> that pushes it over the limit of needing this, but it's more > >> compelling > >>>> now. > >>>>>>> To your 2) in your last reply Jens: My concern wasn't Multi-Team > >>>>>> specifically or anything to do with config. More just that this > >> changes > >>>> the > >>>>>> (mostly unwritten) contract that the scheduler wholly owns and > >> interacts > >>>>>> with executors. Up until this change one could depend on this being > >> true > >>>>>> and that there is only ever one instance of any executor (per team > >> now, > >>>>>> since two teams can have the same executor between them). But now > your > >>>>>> changes are instantiating executors and queuing work with them. So > all > >>>>>> future executor/scheduling changes need to keep this in mind. It > >> simply > >>>>>> increases the surface area of execution/scheduling in Airflow and we > >> now > >>>>>> have a multi-writer situation which always brings extra complexity. > If > >>>>>> there is enough demand then we should do it. Seems to be a few > >>>> requesters > >>>>>> now, which I think brings me to a +0 on this one. > >>>>>>> Thanks again for the discussion! > >>>>>>> > >>>>>>> ________________________________ > >>>>>>> From: André Ahlert<[email protected]> > >>>>>>> Sent: Thursday, March 19, 2026 1:15 PM > >>>>>>> To:[email protected] <[email protected]> > >>>>>>> Subject: RE: [EXT] [DISCUSS] PR: Option to directly submit to > worker > >>>>>> queue from Triggerer > >>>>>>> CAUTION: This email originated from outside of the organization. Do > >> not > >>>>>> click links or open attachments unless you can confirm the sender > and > >>>> know > >>>>>> the content is safe. > >>>>>>> AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur > >>>>>> externe. Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe > si > >>>> vous > >>>>>> ne pouvez pas confirmer l’identité de l’expéditeur et si vous n’êtes > >> pas > >>>>>> certain que le contenu ne présente aucun risque. > >>>>>>> Hi all, > >>>>>>> > >>>>>>> This discussion resonates with problems we've seen on a fintech > >>>> client's > >>>>>>> setup. Heavy KPO usage in deferred mode, Celery executor, pools > with > >>>>>>> include_deferred=True. When a batch of tasks finishes on the > cluster > >>>> and > >>>>>>> flips back to SCHEDULED, the pool slots are released and tasks have > >> to > >>>>>>> recompete through the scheduler for what amounts to seconds of > >> cleanup > >>>>>>> work. In our case the cloud cost was not the main concern, but it > >>>> raised > >>>>>> a > >>>>>>> real architectural question: why does a task that already passed > all > >>>>>>> concurrency checks need to go through the entire scheduling loop > >> again > >>>>>> just > >>>>>>> to collect XCom and delete a pod? > >>>>>>> > >>>>>>> On the architectural concerns, I think the mini-scheduler > comparison > >>>> from > >>>>>>> Airflow 2 does not hold. That feature made full scheduling > decisions > >>>>>>> (concurrency, pools, priority). This PR makes none. It re-enqueues > a > >>>> task > >>>>>>> that allready satisfied all those checks. The task already had a > >> slot, > >>>>>>> already was running. We are just sending it back to finish. > >>>>>>> > >>>>>>> Fact that it is opt-in with a safe fallback to SCHEDULED makes this > >>>> very > >>>>>>> low risk. If nobody configures direct_queueing_executors, behavior > is > >>>>>>> identical to today. Marking it experimental for a release or two > >> would > >>>>>> also > >>>>>>> be fine. > >>>>>>> > >>>>>>> On the use case being narrow: any deployment using deferred mode > with > >>>>>>> Celery at scale will eventually hit this. Issue #57210 shows > >>>> independent > >>>>>>> reports of the same pattern. It is inherent to the DEFERRED -> > >>>> SCHEDULED > >>>>>> -> > >>>>>>> QUEUED state machine under pool contention, not specific to one > >> setup. > >>>>>>> +1 (non-binding) on getting this into main. > >>>>>>> > >>>>>>> Thanks Jens for bringing this one. > >>>>>>> > >>>>>>> André Ahlert > >>>>>>> > >>>>>>> Em qui., 19 de mar. de 2026 às 14:24, Jens Scheffler< > >>>> [email protected] > >>>>>>> escreveu: > >>>>>>> > >>>>>>>> Hi Niko, > >>>>>>>> > >>>>>>>> thanks for the feedback. > >>>>>>>> > >>>>>>>> (1) I think the use case is not really narrow, @tirkarthi also > >> pointed > >>>>>>>> to issuehttps://github.com/apache/airflow/issues/57210 - So this > >>>> would > >>>>>>>> be closed as well > >>>>>>>> > >>>>>>>> (2) I aimed to include support for Multi-Team (that was even > adding > >>>> some > >>>>>>>> complexity compared to the local patch in 3.1.7. Yes so the config > >>>>>>>> property is atm global such that if you set it enabled for > >>>>>>>> CeleryExecutor it would be for all Executors in all teams. But if > >> you > >>>>>>>> wish and see a reason we can also model the config being team > >> specific > >>>>>>>> (e.g. only team_a uses CeleryExecutor and team_b does not > optimize - > >>>>>>>> though not sure if there is a need to separate). Routing and > >> queueing > >>>>>>>> for sure is respectinv the correct Executor/queue instance to > route > >> to > >>>>>>>> in the PR. > >>>>>>>> See > >>>>>>>> > >>>>>>>> > >> > https://github.com/apache/airflow/pull/63489/changes#diff-c30603fe0a3527e23af541ba115c91c85c9c213e6d105af6a48c88a7018a5799R333 > >>>>>>>> and > >>>>>>>> > >>>>>>>> > >> > https://github.com/apache/airflow/pull/63489/changes#diff-c30603fe0a3527e23af541ba115c91c85c9c213e6d105af6a48c88a7018a5799R347 > >>>>>>>> Jens > >>>>>>>> > >>>>>>>> On 18.03.26 23:34, Oliveira, Niko wrote: > >>>>>>>>> Thanks for the write-up Jens, it helps to have the full context > of > >>>> your > >>>>>>>> thought process. > >>>>>>>>> The code changes themselves are small and fairly elegant. But > this > >>>>>>>> breaks the invariant that there is only ever one instance of each > >>>>>> executor > >>>>>>>> (per team) and that they live in the scheduler process and the > >>>>>> scheduler is > >>>>>>>> the only thing that interacts with them in a scheduling capacity. > To > >>>> me > >>>>>>>> it's quite a large logical change in Airflow behaviour/operation. > >> When > >>>>>>>> reasoning about execution and scheduling there is now another > source > >>>>>> that > >>>>>>>> will always need to be considered, ensuring it works and is tested > >>>> when > >>>>>>>> executor related changes are made, etc. This has been fraught for > us > >>>> in > >>>>>> the > >>>>>>>> past in ways that were hard to predict beforehand. > >>>>>>>>> The usecase seems quite narrow and focused on how you folks use > >>>>>> Airflow. > >>>>>>>> Are you hearing from any other users who are asking for something > >> like > >>>>>>>> this? I'm just not sure I see enough evidence that it truly > belongs > >> in > >>>>>>>> apache/airflow main. > >>>>>>>>> Cheers, > >>>>>>>>> Niko > >>>>>>>>> > >>>>>>>>> ________________________________ > >>>>>>>>> From: Jens Scheffler<[email protected]> > >>>>>>>>> Sent: Wednesday, March 18, 2026 2:56 PM > >>>>>>>>> To:[email protected] <[email protected]> > >>>>>>>>> Subject: [EXT] [DISCUSS] PR: Option to directly submit to worker > >>>> queue > >>>>>>>> from Triggerer > >>>>>>>>> CAUTION: This email originated from outside of the organization. > Do > >>>> not > >>>>>>>> click links or open attachments unless you can confirm the sender > >> and > >>>>>> know > >>>>>>>> the content is safe. > >>>>>>>>> AVERTISSEMENT: Ce courrier électronique provient d’un expéditeur > >>>>>>>> externe. Ne cliquez sur aucun lien et n’ouvrez aucune pièce jointe > >> si > >>>>>> vous > >>>>>>>> ne pouvez pas confirmer l’identité de l’expéditeur et si vous > n’êtes > >>>> pas > >>>>>>>> certain que le contenu ne présente aucun risque. > >>>>>>>>> Dear Airflow Devs! > >>>>>>>>> > >>>>>>>>> TLDR: Because of operational problems in processing workload we > >>>> propose > >>>>>>>>> an extension allowing to directly re-queue tasks from triggerer. > >> The > >>>> PR > >>>>>>>>> raised demand to discuss to ensure awareness for the change is > >>>>>> available. > >>>>>>>>> Pull Request: Allow direct queueing from triggerer > >>>>>>>>> <https://github.com/apache/airflow/pull/63489#top> #63489 > >>>>>>>>> https://github.com/apache/airflow/pull/63489 > >>>>>>>>> > >>>>>>>>> The Use Case/Problem Statement: > >>>>>>>>> > >>>>>>>>> We use Airflow for many workflows of scaled long and large Dags > in > >>>>>>>>> running 80% KPO workload. To ensure KPO can run scaled and long > w/o > >>>>>>>>> operation interruptions (worker restart due to re-deployment, > Pods > >>>> with > >>>>>>>>> workload sometimes running 4-10h) and to be able to scale to > >>>> thousands > >>>>>>>>> of running KPO Pods we need to use and leverage deferred mode > >>>>>>>> excessively. > >>>>>>>>> In KPO with deferred a task is first scheduled to a (Celery in > our > >>>>>> case) > >>>>>>>>> worker which prepares the Pod manifest and starts the Pod. From > >> there > >>>>>> it > >>>>>>>>> hands-over to triggerer which monitors the Pod running and tails > >> the > >>>>>> log > >>>>>>>>> so that a user can watch progress. Once the Pod is completed it > >>>> returns > >>>>>>>>> back to a (Celery) worker that finishes-up work, extracts XCom, > >> makes > >>>>>>>>> error handling and cleans-up the Pod from K8s. This also means > that > >>>> the > >>>>>>>>> Pod is only finished when the XCom is pulled from side-car, the > >>>> "base" > >>>>>>>>> container might be completed and the Pod is only done and deleted > >>>> when > >>>>>>>>> the XCom is collected. Until KPO collects XCom the Pod keeps > >> running. > >>>>>>>>> The current method of scheduling in Airflow is that the Scheduler > >>>>>> checks > >>>>>>>>> all rules of concurrency (max_active_tasks, max_tis_per_dagrun, > >>>>>>>>> pools...) in state scheduled before a task is queued to be > started. > >>>> On > >>>>>>>>> the worker when started it is directly set to "deferred" and > then a > >>>>>>>>> triggerer picks-up (no re-scheduling or active distribution to a > >>>>>>>>> triggerer). On the way back today triggerer marks the task > >>>> "scheduled" > >>>>>>>>> which means the scheduler logic needs to pick-up the task again > for > >>>>>>>>> competition. With all other workload. And re-schedule with all > >>>>>>>>> concurrency and priority checks like initially to get to queued > to > >> be > >>>>>>>>> re-assigned to a (Celery) worker. This implicitly means leaving > the > >>>>>>>>> state of "deferred" to "scheduled" the task loses the allocated > >> pool > >>>>>>>>> slot and also need to re-allocate this. > >>>>>>>>> > >>>>>>>>> It most regular situations this is okay. In our scenario it is a > >>>>>>>>> problem: We have many Dags competing for the K8s cluster > resources > >>>> and > >>>>>>>>> the concurrency features of Airflow joined with priority controls > >>>>>> should > >>>>>>>>> ensure that important workload runs first. Once there is residual > >>>>>>>>> capacity less important batches can consume cluster resources. > And > >>>> with > >>>>>>>>> "consume resources" also refers to Pods sitting on the cluster. > >> They > >>>>>>>>> free up the cluster space only at point of XCom collected and Pod > >>>>>>>>> removed. Before they still consume CPU and ephemeral storage > >>>>>>>>> allocations. We limit the amount of workload being able to be > sent > >> to > >>>>>>>>> K8s by Airflow pools which are the ultimate limit for concurrency > >> on > >>>>>>>>> different node pools (e.g. nodes with GPU and nodes w/o GPU). > Other > >>>>>>>>> workload often runs on Edge workers or directly as Python in > >> Celery. > >>>>>>>>> With multiple Dags and different priorities we had these two > >> effects: > >>>>>>>>> (1) A lower priority batch is running ~N*100 Pods in deferred. A > >>>> higher > >>>>>>>>> priority large batch is started. Pods finishing from the lower > >>>> priority > >>>>>>>>> tasks are assumed to drain the cluster, when they end the task > >>>>>> instances > >>>>>>>>> are set to "scheduled" and... then stay there until all tasks of > >> the > >>>>>>>>> higher priority tasks are worked off (assuming the higher > priority > >>>>>> tasks > >>>>>>>>> are not limited leaving room for the lower priority tasks). So > base > >>>>>>>>> container of the Pods are completed, the XCom side car waits > long - > >>>> we > >>>>>>>>> have seen even 24h - to be XCom collected to be cleaned. > >>>>>>>>> > >>>>>>>>> (a) Additional side effect if pending long the > AutoScaler > >>>> might > >>>>>>>>> pick such a node as scaling victim because really idle and after > >>>> grace > >>>>>>>>> period kills the Pod - Later when the workload returns to worker > >> the > >>>>>> Pod > >>>>>>>>> is showing a HTTP 404 as being gone, XCom is lost... in most > cases > >>>> need > >>>>>>>>> to run a retry, else it is anyway a delay and additional hours of > >>>>>>>>> re-execution. If no retry just raising failures to users. > >>>>>>>>> > >>>>>>>>> (b) We had the side effect that newer high priority > >> workload > >>>> was > >>>>>>>>> not scheduled by K8s to the (almost idle) Nodes because the > >> previous > >>>>>>>>> pending Pods allocated still ephemeral storage and not sufficient > >>>> space > >>>>>>>>> was on K8s nodes for new workload... so the old Pods blocked the > >> new > >>>>>> and > >>>>>>>>> the higher priority task instances blocked the cleanup of the > lower > >>>>>> prio > >>>>>>>>> instances. A lot of tasks were in a kind of dead-lock. > >>>>>>>>> > >>>>>>>>> (c) As the re-set to state "scheduled" from triggerer > also > >>>> sets > >>>>>> the > >>>>>>>>> "scheduled" date of the task instance also the from the same "low > >>>>>>>>> priority" Dag other pending scheduled task instances are often > >>>> started > >>>>>>>>> earlier. So workers pick-up new tasks to start new Pods but a lot > >> of > >>>>>> old > >>>>>>>>> Pods are sitting there idle waiting to collect XCom to clean-up > >>>>>>>>> > >>>>>>>>> (2) Also sometimes because of operational urgency we use the > >>>>>>>>> "enable/disable" scheduling flag on Dags in the UI to > >>>> administratively > >>>>>>>>> turn-off Dag scheduling to leave space for other Dags... or to > >> drain > >>>>>> the > >>>>>>>>> cluster for some operational procedures e.g. to have a safe > ground > >>>>>>>>> before maintenance. But as the Dag needs to be actively scheduled > >> to > >>>>>>>>> process the return from triggerer. If you turn off scheduling the > >>>>>>>>> workload in flight is never finishing and is getting stuck like > >>>>>>>>> described before. Pods are stale on the cluster, nobody picks-up > >> the > >>>>>>>>> XCom. And the problematic scenario is also there is no way to > >> "clean > >>>>>> up" > >>>>>>>>> such tasks to finish these Dags w/o turning on scheduling... but > >> then > >>>>>>>>> also new tasks are queued and you are just not able to drain the > >>>>>>>>> cluster. I know we discussed multiple times that we might need a > >>>>>> "drain" > >>>>>>>>> mode to let existing Dags finish but not scheduling new Dag > runs... > >>>> but > >>>>>>>>> such feature is also missing. To say: Scheduling new tasks is > >> tightly > >>>>>>>>> coupled with the scheduling of cleanup. Not possible to separate. > >>>>>>>>> Getting to the problems as 1 (a-c) as well in the scenario (2). > >>>>>>>>> > >>>>>>>>> We thought a while about which options we would have to > contribute > >> to > >>>>>>>>> improve in general. Assumption and condition is that the initial > >>>> start > >>>>>>>>> on the (Celery) Worker is fast, most time is spent (once) on the > >>>>>>>>> triggerer and the return to worker is actually only made for a > few > >>>>>>>>> seconds to clean-up. And of course we want to minimize latency to > >> (I) > >>>>>>>>> free the allocated resources and (II) not to have any additional > >>>>>>>>> artificial delay for the user. Which a bit contradicts with the > >>>> efforts > >>>>>>>>> to flip from worker to triggerer and back again. > >>>>>>>>> > >>>>>>>>> Options we considered: > >>>>>>>>> > >>>>>>>>> * Proposing a new "state" for a task instance, e.g. > >>>> "re-schedule" > >>>>>> that > >>>>>>>>> is handled with priority by scheduler. > >>>>>>>>> But the scheduler is already a big beast of complexity, > >> adding > >>>>>>>>> another loop to handle re-scheduled with all existing > >>>> complexity > >>>>>>>>> might be a large complexity to be added and adding > another > >>>> state > >>>>>> in > >>>>>>>>> the state model also adds a lot of overhead from > >>>> documentation to > >>>>>>>> UI... > >>>>>>>>> * Finish-up the work on triggerer w/o return to Worker. > It is > >>>> only > >>>>>>>>> about cleaning the KPO and... > >>>>>>>>> Unfortunately more than just monitoring is very complex > to > >>>>>> implement > >>>>>>>>> and especially XCom DB access is not a desired concept > and > >>>>>> triggerer > >>>>>>>>> does not have support. We also have some specific > triaging > >> and > >>>>>> error > >>>>>>>>> handling automation extended on top of KPO which all in > >> async > >>>>>> with > >>>>>>>>> the limited capabilities of triggerer would be hard to > >>>> implement. > >>>>>>>>> Main blocker in this view is XCom access. > >>>>>>>>> * Dynamically increase priority of a task returning from > >>>> triggerer. > >>>>>>>>> We considered "patching" the priority_weight value of > the > >> task > >>>>>>>>> instance on the triggerer before return to ensure that > >> tasks > >>>>>>>>> returning are just elevated in priority. First we made > this > >>>> from > >>>>>> the > >>>>>>>>> side via SQL (UPDATE task_instance SET > priority_weight=1000 > >>>> WHERE > >>>>>>>>> state='scheduled' AND next_method='trigger_reentry') but > >>>>>> actually if > >>>>>>>>> the task failed and restarted then it is hard to find > and > >>>> reset > >>>>>> the > >>>>>>>>> priority back... still a retry would need to be reset > >> down... > >>>> all > >>>>>>>>> feels like a workaround. > >>>>>>>>> * Implementing a special mode in Scheduler to select tasks > >> with > >>>>>>>>> "next_method" being set as signal they are returning > from > >>>>>> triggerer > >>>>>>>>> in a special way... assuming they have a Pool slot and > >> exclude > >>>>>> some > >>>>>>>>> of the concurrency checks (As in "scheduled" state the > pool > >>>> slot > >>>>>> is > >>>>>>>>> actually "lost") > >>>>>>>>> But this hard to really propose... as this might be even > >>>> harder > >>>>>> than > >>>>>>>>> the first option as well as the today complex code would > >> get > >>>> even > >>>>>>>>> harder in scheduler to consider exceptions in > >> concurrency... > >>>> with > >>>>>>>>> the risk that such special cases exceed the planned > >>>> concurrency > >>>>>>>>> limits if otherwise the pools are exhausted before > already. > >>>>>>>>> * Adding a REST API that the triggerer can call on > scheduler > >> to > >>>>>>>>> cross-post workload. > >>>>>>>>> That would need to add a new connection and component > >>>> bundling, a > >>>>>>>>> REST API endpoint would need to be added for schedulers > to > >>>>>> receive > >>>>>>>>> these push calls. Probably an alternative but also adds > >>>>>>>>> architectural dependability. > >>>>>>>>> * The PR we propose to discuss here: If the task skips > >>>> "scheduled" > >>>>>>>>> state and moves to queue directly the pool slot keeps > >>>> allocated > >>>>>>>>> (assuming that Deferred in actively counting into pool > and > >>>>>>>>> concurrency limits). Code looked not too complex as just > >> the > >>>>>>>>> enqueuing logic from Scheduler could be integrated. > >>>>>>>>> In this it is considering that such direct queuing is > only > >>>>>> possible > >>>>>>>>> if the executor supports queues (not working for > >>>>>> LocalExecutor!). So > >>>>>>>>> the proposed PR made it explicitly opt-in. > >>>>>>>>> > >>>>>>>>> Reasons (and pro-arguments) why we propose to have the PR on > main: > >>>>>>>>> > >>>>>>>>> * As of a lot of operational problems recently we tested > this > >>>> and > >>>>>>>>> patched this locally into our 3.1.7 triggerer. Works > >> smoothly > >>>> on > >>>>>>>>> production since ~1 week > >>>>>>>>> * If something goes wrong or Executor is not supported > then > >> the > >>>>>>>>> existing path setting to "scheduled" is always used as > safe > >>>>>> fallback > >>>>>>>>> * It is selective and is an opt-in feature > >>>>>>>>> * We dramatically reduced latency from Pod completion to > >> cleanup > >>>>>> some > >>>>>>>>> sometimes 6-24h to a few seconds > >>>>>>>>> * We assume the cleanup as return from Worker is a small > >> effort > >>>>>> only > >>>>>>>>> so no harm even if temporarily over-loading some limits > >>>>>>>>> * ...But frankly speaking the concurrency limits and Pools > >> were > >>>>>>>>> checked initially at time of start. Limiting cleanup > later > >> on > >>>>>>>>> concurrency limits is not adding any benefits but just > >> delays > >>>> and > >>>>>>>>> problems. We just want to finish-up work. > >>>>>>>>> * But finally actually over-loading is not possible as > still > >> the > >>>>>> Redis > >>>>>>>>> queue is in between - so any free Celery Worker will > pick > >> the > >>>>>> task. > >>>>>>>>> Even in over-load it will just sit in Celery queue for a > >>>> moment. > >>>>>>>>> * It is a relatively small change > >>>>>>>>> * Off-loads scheduler by 50% for all deferred tasks (need > to > >>>> pass > >>>>>>>>> scheduler only once) > >>>>>>>>> * Due to reduced latency on cleanup more "net workload" > >>>>>> schedulable on > >>>>>>>>> the cluster, higher cloud utilization / less idle time. > >>>>>>>>> > >>>>>>>>> Hearing the feedback on PR reflecting with the devils advocate I > >>>> could > >>>>>>>>> understand the counter arguments: > >>>>>>>>> > >>>>>>>>> * In Airflow 2 there was a mini-scheduler, there was a > hard > >>>> fight > >>>>>> in > >>>>>>>>> Airflow 3 to get rid of this! > >>>>>>>>> Understand. But we do not want to add a "mini > scheduler" we > >>>> just > >>>>>>>>> want to use parts of the Executor code to push the task > >>>> instance > >>>>>> to > >>>>>>>>> queue and skip scheduling. It is NOT the target to make > any > >>>> more > >>>>>> and > >>>>>>>>> schedule anything else. > >>>>>>>>> * This would skip all concurrency checks and potentially > >>>> over-load > >>>>>> the > >>>>>>>>> workers! > >>>>>>>>> No. Concurrency rules are checked when the workload is > >>>> initially > >>>>>>>>> started. I know there are parallel bugs we are fighting > >> with > >>>> to > >>>>>>>>> ensure deferred status is counted on all levels into > >>>> concurrency > >>>>>> to > >>>>>>>>> correctly keep limits. Assuming that you enable counting > >>>> deferred > >>>>>>>>> into pools, a direct re-queue to worker is just keeping > the > >>>>>> level of > >>>>>>>>> concurrency not adding more workload... just > transferring > >>>> back. > >>>>>> And > >>>>>>>>> Celery for example has a queue so not really > over-loading. > >> It > >>>> is > >>>>>>>>> mainly intended to clean-up workload which is a low > effort > >>>> task. > >>>>>>>>> * We plan to cut-off components and untangle package > >>>> dependencies. > >>>>>>>>> After worker the Dag parser and triggerer are next. > >> Linking to > >>>>>>>>> Executor defeats these plans! > >>>>>>>>> Yes, understood. But also today the setting of the > >>>> task_instance > >>>>>> is > >>>>>>>>> using direct DB access... and would in such surgery need > >> to be > >>>>>> cut > >>>>>>>>> to the level that the DB access would need to be moved > to > >>>>>> execution > >>>>>>>>> API back-end. So the cut for re-queueing would move to > >>>> execution > >>>>>> API > >>>>>>>>> in future, not triggerer. I think it would be valid to > >> think > >>>>>> about > >>>>>>>>> the options if such distribution is made how that might > >>>> evolve in > >>>>>>>>> future. > >>>>>>>>> * This option is risky and we have concerns people have > more > >>>>>> errors. > >>>>>>>>> Feature is opt-in, need to be configured. Per default as > >>>>>> proposed in > >>>>>>>>> the PR it is not active. Would be also acceptable to > mark > >> this > >>>>>>>>> experimental for a while. > >>>>>>>>> > >>>>>>>>> Sorry, a bit longer text. Happy to get feedback. > >>>>>>>>> > >>>>>>>>> Jens > >>>>>>>>> > >> --------------------------------------------------------------------- > >>>>>>>> To unsubscribe,e-mail:[email protected] > >>>>>>>> For additional commands,e-mail:[email protected] > >>>>>>>> > >>>>>>>> > >>>> --------------------------------------------------------------------- > >>>> To unsubscribe, e-mail:[email protected] > >>>> For additional commands, e-mail:[email protected] > >>>> > >>>> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
