I like the proposal overall and may be bikeshedding this - and if so I do 
apologize - but I feel it should be more explicit, either @async-task or 
@task.async (to match the existing @task.batch pattern)... but I don't feel 
particularly strongly about that.

 - ferruzzi 

On 2025/12/16 19:30:26 Blain David wrote:
> Hello Jarek, you completely nailed it with your explanation, that's indeed 
> what I wanted to achieve.
> 
> And yes, as Daniel also suggested, you could achieve the same by just 
> wrapping the async code in an async method and using asyncio.run() in a 
> regular PythonOperator task (like in your first example).  But as I 
> explained, that would have certainly worked in Airflow 2 but not in 3, due to 
> Task SDK.  Of course this also depends on how the async Hook operations are 
> implemented.
> Besides that, my main goal is simply to be able to execute async callables 
> with the PythonOperator without having to worry about event-loop management.
> 
> In the PR, the PythonOperator explicitly detects whether the provided 
> callable is synchronous or asynchronous and chooses the correct execution 
> path accordingly. The same applies to @task-decorated functions: when a task 
> is defined as async, it is executed as a coroutine. This logic is centralized 
> entirely in the PythonOperator; the decorators merely delegate to it. As a 
> result, this behavior is fully covered by the PR and demonstrated in the 
> examples included in the AIP proposal.
> 
> Regarding the iteration / looping aspect shown in the examples: internally, 
> we’ve abstracted this away using our own IterableOperator.
> So there, Jarek, you also completely understood correctly what I want to 
> achieve, but this could be a future proposal and is out of the scope for 
> AIP-98.
> AIP-98 only focusses on the ability to allow the PythonOperator to execute 
> async callables out of the box as a first-class citizen and have a 
> BaseAsyncOperator (PythonOperator extends BaseAsyncOperator already in my PR) 
> so we can later on extend this feature not only on the PythonOperator but to 
> other operators as well.
> 
> Regarding the more sophisticated looping abstraction I want to achieve as you 
> state, this would mean dag authors don’t have to think about concurrency, 
> iteration, or async execution at all. I also understand your earlier point we 
> discussed about preferring mapped tasks when UI visibility is required. In 
> our case, we don’t need the same level of UI granularity, so this trade-off 
> is acceptable for us and in return we have far much better performance, so at 
> the moment it doesn't offer the same UI monitoring and retry mechanism as the 
> mapped tasks mechanism does.
> 
> To illustrate, here’s how your example would look using our IterableOperator 
> (which actually started AIP-88 discussion in the very beginning).
> Instead of calling expand, we simply call iterate on the OperatorPartial:
> 
> class TheHookToRun:
>     async def method_to_run(self, value):
>         ...
> 
> @task
> def lots_of_values():
>     return list(range(0, 10000))
> 
> @task
> async def the_task(value):
>     conn_id = "a_connection"
>     result = await TheHookToRun(conn_id).method_to_run(val=value)
>     return result
> 
> the_task.iterate(value=lots_of_values())  # here we use iterate instead of 
> expand
> 
> In this setup, the async aspects are completely transparent to the user — 
> everything is handled out of the box by the IterableOperator, and the task 
> author does not need to reason about event loops or execution semantics like 
> concurrency at all.  The IterableOperator solution is not only applicable for 
> async callables but also for regular synchronous callables.
> 
> David
> 
> -----Original Message-----
> From: Jarek Potiuk <[email protected]> 
> Sent: 16 December 2025 00:55
> To: [email protected]
> Cc: Philippe Gagnon <[email protected]>
> Subject: Re: [PROPOSAL] Rethinking deferrable operators, async hooks and 
> performance in Airflow 3 by supporting native async PythonOperator
> 
> EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze niet 
> vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel, stuur 
> deze e-mail als bijlage naar [email protected]<mailto:[email protected]>.
> 
> This is a good point Philippe - that one comment clicked why I had a bit of 
> struggle with the proposal and maybe this is something that we all think this 
> proposal is about - but it's not clear without examples.
> 
> So let me rephrase how I understand it and see if we can agree on it - with 
> examples.
> 
> For me `@task` is always synonymous with single "process" execution - even 
> when mapped - the single execution of a method in @task clearly starts when 
> the process starts and when it ends - the process exits.
> There is - I think - no particular reason why we should change that. One 
> `@task` = one process.
> 
> Now - currently, the task is just a simple synchronous callable run by 
> PythonOperator in the main thread, so yes, indeed you cannot easily execute 
> async coroutines in it without adding other async methods that you call 
> explicitly with event loop, because you need to run it within an event loop. 
> So you can do this now:
> 
> class TheHookToRun:
>        async def method_to_run():
> 
> async def async_part_of_the_task(conn_id, array_of_values):
>     parts = [ TheHookToRun(conn_id).method_to_run(val=value) for value in 
> array_of_values]
>     results = await asyncio.gather(*parts)
> 
> @task()
> def the_task(array_of_values):
>     conn_id = "a_connection"
>         asyncio.run(async_part_of_the_task(conn_id, array_of_values)
> 
> 
> And I think what we could easily do and **all** that we need is to change the 
> @task decorated methods to be able to run as coroutine, so conceptually we 
> want to do this, I think:
> 
> class TheHookToRun:
>        async def method_to_run():
> 
> @task()
> async def the_task(array_of_values):
>     conn_id = "a_connection"
>     parts = [ TheHookToRun(conn_id).method_to_run(val=value) for value in 
> array_of_values]
>     results = await asyncio.gather(*parts)
> 
> So - what we want to do is to mark `the_task` as async - using standard 
> python semantics and make sure that when callable is passed to PythonOperator 
> is async, we run:
> 
> asyncio.run(callable())
> 
> instead of
> 
> callable()
> 
> We only need to check if the callable is sync or async (something that for 
> example fast_api does and we had recent problems with in a recent version 
> when they got it wrong).
> 
> I think this is basically all we need to achieve what you wanted David.
> Maybe (I am not sure) you also wanted to do looping  somehow outside of the 
> `@task` callable, do something more sophisticated here, but IMHO just 
> explicitly allowing async task-decorated methods and making sure they run in 
> an event loop addresses exactly what you wanted to achieve.
> 
> But maybe you thought about something else David - and can provide an example 
> how it would look like in Dag ?
> 
> J.
> 
> 
> 
> On Mon, Dec 15, 2025 at 6:09 PM Philippe Gagnon via dev < 
> [email protected]> wrote:
> 
> > Hi,
> >
> > > There async tasks will benefit from the multi threading, as they 
> > > share
> > the same event loop and everything is run within the same Celery 
> > worker, but that's another solution.
> >
> > I'm not sure that's accurate; every task runs within its own process. 
> > That being said, I think the ability to run async callables via 
> > PythonOperator (or any other operator for that matter) is valuable in 
> > and of itself, and I don't think there would be much pushback if we 
> > adopted interfaces to facilitate this. WDYT about reducing the scope 
> > of your proposal (and PR) to this framing, that way it would be easier to 
> > gain consensus?
> >
> > (I think that's essentially what you are pitching 🙂)
> >
> > BR,
> >
> > *✨ **Philippe Gagnon*
> > *Meet for 30 mins 📅* 
> > <https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcal
> > endar.app.google%2F5qzgD9SadybUvSCv8&data=05%7C02%7Cdavid.blain%40infr
> > abel.be%7C5b39dc7254d841e56d3f08de3c35adac%7Cb82bc314ab8e4d6fb18946f02
> > e1f27f2%7C0%7C0%7C639014398414722610%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0e
> > U1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIld
> > UIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=4MuD3ytW04UljU8B9UDilgioTOGrDdXrLkIm0
> > d1A0nU%3D&reserved=0>
> >
> >
> > On Fri, Dec 5, 2025 at 2:49 AM Blain David <[email protected]>
> > wrote:
> >
> > > Hello Jense,
> > >
> > > Thanks for your time and answer.  I just granted you and Zhe-You 
> > > access
> > to
> > > the document.
> > >
> > > In the article I explained why we did the iteration ourselves within 
> > > the async @task decorated function, as this was way faster than 
> > > doing it with dynamic task mapping.
> > > Not that you cannot use dynamic task mapping with async 
> > > PythonOperator,
> > it
> > > just works as with any other operator, it's just doesn't make sense 
> > > as it won't give you any performance benefits due to the fact that 
> > > you don't share the same event loop (at least when using the 
> > > CeleryExecutor).
> > >
> > > You could for example on big streams of data use dynamic tsk mapping 
> > > to chunk the stream in multiple pieces and then each task would 
> > > process the chunk within the async operator for example, a bit like 
> > > partition if you like.
> > >
> > > In the example I used for the article, we once again don't care 
> > > about individual download state of the FTP-file, we just want to 
> > > know if the directory was successfully downloaded or not, ofc we 
> > > added some logging statements to show which file was downloaded.
> > > I also know Jarek wants individual state tracking, but that' not the 
> > > solution I presented here, for micro batching we have our
> > IterableOperator,
> > > which instead of doing partial/expand we do partial/iterate, which
> > actually
> > > does the same as the for loop of the example in the article but then 
> > > managed for you in a multi threaded way for sync as async tasks as 
> > > well.  There async tasks will benefit from the multi threading, as 
> > > they share the same event loop and everything is run within the same 
> > > Celery worker, but that's another solution.
> > >
> > > Still with the dynamic task mapping or IterableOperator, you 
> > > wouldn't be able to use the SFTPClientPool (before name 
> > > AsyncSFTPConnectionPool as in the article), so you wouldn't benefit 
> > > of the performance gain you get
> > from
> > > the pool, that why here in this example, we do the looping 
> > > ourselves.
> > >
> > > And I completely agree for triggerers, we also use it a lot, and it 
> > > is indeed cool for long running tasks in which have lot's of waiting 
> > > times (dead time), and you're just monitoring a state, that the 
> > > purpose of triggerers!
> > > But with some operators, triggers are misused as they are the "only" 
> > > way to run async code which returns a lot of data which have to come 
> > > back to the operator so it can be exposed as an XCom, there you'll 
> > > see that you trigger table in the Airflow database will explode 
> > > fast, As each yielded response is being stored in the database, as 
> > > it can't
> > make
> > > use of a custom XCom backend like operators do, so in practise, each
> > result
> > > yielded by the trigger, will first end up in your tirgger table 
> > > before being stored as an XCom.
> > >
> > > Also, I'm not telling here to replace anything, I just propose an 
> > > alternative solution in Airflow so you're not purely tied to 
> > > deferrable operators.  Also at the Summit, we experienced during a 
> > > lot of presentations, that people tend to use more the 
> > > PythonOperators (or
> > @task)
> > > with hook than
> > > using the operators themselves, which in my opinion makes sense as 
> > > you have more flexibility, you still benefit from the Airflow 
> > > integration
> > with
> > > Hooks but you aren't tight to the operator implementation, which 
> > > might offer limited operations, look for example at the SFTPOperator 
> > > and you'll understand immediately.  That's why I propose to allow 
> > > running async code in PythonOperators natively, that way, you can 
> > > directly interact with
> > async
> > > hooks and you don't need a triggerer to do that.  Triggers are great 
> > > for polling and listening, but not for processing huge amounts of 
> > > data, that's where celery workers shine, thus allowing 
> > > PythonOperators to natively run async code in you celery workers, you can 
> > > do so.
> > >
> > > For you last example, in our case DB calls are still sync, as of my 
> > > knowledge, we don’t have any DB hook based on DBApiHook which 
> > > supports async operations?  Also the db operation can be seen as 
> > > another sync
> > task,
> > > so they don't need to run in the same async task, you just pass the 
> > > XCom returned from the async task to the sync task.  But having 
> > > async DB hooks could be cool, I also though about it, but this would 
> > > also depend on the driver if it supports it, still it also something 
> > > I would like to test in the near future.
> > >
> > > I hope this answered most of your questions Jens 😉
> > >
> > > -----Original Message-----
> > > From: Jens Scheffler <[email protected]>
> > > Sent: 04 December 2025 21:12
> > > To: [email protected]
> > > Subject: Re: [PROPOSAL] Rethinking deferrable operators, async hooks 
> > > and performance in Airflow 3 by supporting native async 
> > > PythonOperator
> > >
> > > EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en 
> > > deze niet vertrouwt, klik niet op een link of open geen bijlages. 
> > > Bij twijfel, stuur deze e-mail als bijlage naar [email protected]<mailto:
> > > [email protected]>.
> > >
> > > Requested access to Google doc to read more details. Am interested 
> > > and also as Daniel what the difference is/would be.
> > >
> > > Especially also as progress tracking might be important. Yes, Task
> > Mapping
> > > is very expensive if you want to download 17k XML files, but also 
> > > when running Async and you are at 5000 files, if you resume would 
> > > you know
> > what
> > > was complete or would it start from scratch all times?
> > >
> > > I think such micro-batching is cool but some state tracking is 
> > > important
> > > - which might if it is in the DB also overload the DB or add very 
> > > many transactions.
> > >
> > > Trioggerer though I think still is cool for long running tasks where 
> > > you just wait for response, e.g. you triggered another job remote or 
> > > you started a Pod that runs for an hour. We have Pods runnign for 
> > > 10h
> > sometimes
> > > and then it is important to be able to roll new SW to workers and 
> > > with triggerers we cann de-couple this.
> > >
> > > So maybe - without missing details - I would judge such 
> > > micro-batching as a third execution option but most probably would not 
> > > replace the others.
> > >
> > > Also knowin from own experience, writing async code is more complex 
> > > and error prone, so if you would request all normal code being async 
> > > you
> > might
> > > scare users away. Proper review needed to ensure all IO is async 
> > > (also DB
> > > calls!)
> > >
> > > On 12/4/25 18:08, Daniel Standish via dev wrote:
> > > > Here's what I'm hearing from this
> > > >
> > > > 1. not using task mapping, but just looping instead, can be much 
> > > > more efficient.
> > > > Yes, of course it can.
> > > >
> > > > 2. there are ways in which triggerer / deferrable operators are 
> > > > not fully complete, or do not fully have feature parity with 
> > > > regular operators (such as the custom xcom backend example) I 
> > > > believe it.  But this could certainly be worked on.
> > > >
> > > > Question for you:
> > > >
> > > > How is your proposal different / better than say, just calling 
> > > > `asyncio.run(...)` in a python task?
> > > >
> > > >
> > > > On Thu, Dec 4, 2025 at 8:38 AM Blain David 
> > > > <[email protected]>
> > > wrote:
> > > >
> > > >> As I already discussed with Jarek in the past but also with 
> > > >> Hussein during the Airflow Summit, we at a certain moment 
> > > >> encountered performance issues when using a lot of deferred operators.
> > > >>
> > > >> Allowing PythonOperators (and thus also @task decorated methods) 
> > > >> to natively execute async Python code in Airflow solved our 
> > > >> performance
> > > issues.
> > > >> And yes, you could argue if that’s really necessary and also 
> > > >> what’s the added value? And at first you would indeed think it 
> > > >> doesn’t make sense at all do so, right?
> > > >> But please bear with me first and hear me out first why we did it 
> > > >> that way and how it solved our performance issues and it will 
> > > >> become crystal clear 😉
> > > >> So below is the article I wrote, which is also publicly available 
> > > >> here< https://doc/ 
> > > >> s.google.com%2Fdocument%2Fd%2F1pNdQUB0gH-r2X1N_g774IOUEurowwQZ5OJ
> > > >> 7yiY 
> > > >> 89qok&data=05%7C02%7Cdavid.blain%40infrabel.be%7Ca90fcc33ab604f91
> > > >> 948f
> > > >> 08de33719fc0%7Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C6390047
> > > >> 6028 
> > > >> 6869534%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwL
> > > >> jAuM 
> > > >> DAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C
> > > >> %7C& 
> > > >> sdata=%2FNbnXz%2BWTH0WDp8lTic8sraokWDojaNYfr51I2ohy58%3D&reserved
> > > >> =0> on Google Docs  which makes it easier to read than through 
> > > >> the
> > devlist.
> > > >>
> > > >> Here is my article:
> > > >>
> > > >> Rethinking deferrable operators, async hooks and performance in 
> > > >> Airflow 3
> > > >>
> > > >> At our company, we strive to avoid custom code in Airflow as much 
> > > >> as possible to improve maintainability.
> > > >> For years this meant favouring dedicated Airflow operators over 
> > > >> Python operators.
> > > >> However, in Airflow 3, as the number of deferred operators in our 
> > > >> DAGs continued to grow, we began facing severe performance issues 
> > > >> with deferrable operators, which forced us to re-evaluate that
> > approach.
> > > >>
> > > >> Initially we expected deferrable operators to improve performance 
> > > >> for I/O-related tasks—such as REST API calls—because triggerers 
> > > >> follow an async producer/consumer pattern. But in practice we 
> > > >> discovered the
> > > opposite.
> > > >>
> > > >> Why Deferrable Operators Became the Bottleneck?
> > > >>
> > > >> Deferrable operators and sensors delegate async work to triggerers.
> > > >> This is perfectly fine for lightweight tasks such as polling or 
> > > >> waiting for messages on a queue.
> > > >>
> > > >> But in our case:
> > > >>
> > > >>
> > > >>    *   MSGraphAsyncOperator performs long-running async operations.
> > > >>    *   HttpOperator in deferrable mode can perform long-running HTTP
> > > >> interactions, especially if pagination is involved.
> > > >>    *   There is no native deferrable SFTPOperator, so if we want to
> > use
> > > the
> > > >> SFTPHookAsync, we must use the PythonOperator which natively 
> > > >> doesn’t support async code (not that big of challenge).
> > > >>    *   Both can return large payloads.
> > > >>    *   Triggerers must store yielded events directly into the Airflow
> > > >> metadata database.
> > > >>
> > > >> Triggerers are not designed for sustained high-load async 
> > > >> execution or large data transfers. Unlike Celery workers, 
> > > >> triggerers scale poorly and quickly become the bottleneck.
> > > >>
> > > >> Yielded events from triggers are stored directly in the Airflow 
> > > >> metadata database because, unlike workers, triggers cannot 
> > > >> leverage a custom XCom backend to offload large payloads, which 
> > > >> can lead to increased database load and potential performance 
> > > >> bottlenecks.
> > > >>
> > > >> Dynamic task mapping with deferrable operators amplifies the 
> > > >> problem even further which AIP‑88 partially solves.
> > > >> Triggerers also cannot be run on the Edge Executor as triggerers 
> > > >> are still tightly coupled with the Airflow metadata database 
> > > >> (possibly addressed in AIP‑92).
> > > >>
> > > >> Rethinking the approach: Async hooks + Python tasks
> > > >>
> > > >> These limitations led us to reconsider calling async hooks 
> > > >> directly from Python @task decorated functions or 
> > > >> PythonOperators, thus avoiding deferrable operators and thus 
> > > >> triggerers entirely.
> > > >> Operators are wrappers around hooks. Well‑written operators 
> > > >> should contain little logic and delegate all the work to the 
> > > >> hooks which do the real work,so  why not call them directly?
> > > >> This idea is also a bit in line with what Bolke already 
> > > >> presented< 
> > > >> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2
> > > >> Fairflowsummit.org%2Fslides%2F2023%2Fab1-1400-Operators.pdf&data=
> > > >> 05%7C02%7Cdavid.blain%40infrabel.be%7C5b39dc7254d841e56d3f08de3c3
> > > >> 5adac%7Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C63901439841474
> > > >> 7203%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAu
> > > >> MDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7
> > > >> C%7C&sdata=yxB9i4xCbGvDpYZ4VY3UDiRIpUe4pBqlL1xYFixKoXw%3D&reserve
> > > >> d=0> in
> > 2023.
> > > >>
> > > >> Advantages of this approach include:
> > > >>
> > > >>
> > > >>    *   No dynamic task mapping needed when iterating—just loop in
> > > Python,
> > > >> unless you really need to track each individual step but that 
> > > >> comes with a cost.
> > > >>    *   Massive reduction in scheduler load.
> > > >>    *   No triggerers involved.
> > > >>    *   Async code can run on Edge Workers.
> > > >>    *   Celery workers scale far much better than triggerers, so by
> > > moving
> > > >> from deferred operators and thus triggerers to async operators on 
> > > >> celery workers, our performance issues on the triggerer were gone 
> > > >> and run times were much shorter probably because the trigger 
> > > >> mechanism also puts more load on the scheduler.
> > > >>    *   Sync or async doesn’t make any difference in performance,
> > unless
> > > you
> > > >> have to execute the same async function multiple times, that’s 
> > > >> when async shines compared to sync especially with I/O related 
> > > >> operations.
> > > >>
> > > >> Concrete Example: Async SFTP Downloads
> > > >>
> > > >> Below is an example comparing the download of ~17,000 XML-files 
> > > >> and storing into our Datawarehouse.
> > > >> A single Celery worker can orchestrate many concurrent downloads 
> > > >> using asyncio.
> > > >> A semaphore (here used internally by the AsyncSFTPConnectionPool) 
> > > >> protects the SFTP server from being overloaded.
> > > >> Benchmark results:
> > > >>
> > > >> Approach
> > > >>                              Environment                    Time
> > > >> Mapped SFTPOperator
> > > >>             production                       3h 25m 55s
> > > >> PythonOperator + SFTPHook
> > > >>       local laptop                     1h 21m 09s
> > > >> Async Python task + SFTPHookAsync (without pool)       local laptop
> > > >>               8m 29s
> > > >> Async Python task + AsyncSFTPConnectionPool              production
> > > >>                 3m32s
> > > >>
> > > >> As you all can conclude, DagRun time went down from more than 3 
> > > >> hours to only 3 minutes and a half, which is huge!
> > > >>
> > > >> In the google docs there are 2 different code snippets on how 
> > > >> it’s done sync and async which I will not put here.
> > > >>
> > > >> Conclusion
> > > >>
> > > >> Using async hooks inside async Python tasks provides better 
> > > >> performance, scalability, and flexibility, and avoids reliance on
> > > triggerers entirely.
> > > >> This hybrid approach—'async where it matters, operators where 
> > > >> they make sense'—may represent the future of high‑performance 
> > > >> Airflow data processing workloads.
> > > >>
> > > >> What did I change in Airflow?
> > > >>
> > > >> Not that much, I only:
> > > >>
> > > >>
> > > >>    *   Introduced an async PythonOperator so you don’t have to handle
> > > the
> > > >> event loop yourself, not that special, but also natively 
> > > >> supported on async @task decorated python methods, which is nice to 
> > > >> read.
> > > >>    *   Did some improvements on the SFTPHookAsync to fully take
> > > advantage
> > > >> of the async.
> > > >>    *   Introduced a SFTPHookPool so multiple asyncio tasks can re-use
> > > >> connection instance to gain even more performance, in this case 
> > > >> it meant a reduction of 5 minutes in processing time, so we went 
> > > >> from 8
> > to
> > > 3 minutes.
> > > >>
> > > >>
> > > >>
> > >
> > > --------------------------------------------------------------------
> > > - To unsubscribe, e-mail: [email protected]
> > > For additional commands, e-mail: [email protected]
> > >
> > >
> >
> 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to