I like the proposal overall and may be bikeshedding this - and if so I do apologize - but I feel it should be more explicit, either @async-task or @task.async (to match the existing @task.batch pattern)... but I don't feel particularly strongly about that.
- ferruzzi On 2025/12/16 19:30:26 Blain David wrote: > Hello Jarek, you completely nailed it with your explanation, that's indeed > what I wanted to achieve. > > And yes, as Daniel also suggested, you could achieve the same by just > wrapping the async code in an async method and using asyncio.run() in a > regular PythonOperator task (like in your first example). But as I > explained, that would have certainly worked in Airflow 2 but not in 3, due to > Task SDK. Of course this also depends on how the async Hook operations are > implemented. > Besides that, my main goal is simply to be able to execute async callables > with the PythonOperator without having to worry about event-loop management. > > In the PR, the PythonOperator explicitly detects whether the provided > callable is synchronous or asynchronous and chooses the correct execution > path accordingly. The same applies to @task-decorated functions: when a task > is defined as async, it is executed as a coroutine. This logic is centralized > entirely in the PythonOperator; the decorators merely delegate to it. As a > result, this behavior is fully covered by the PR and demonstrated in the > examples included in the AIP proposal. > > Regarding the iteration / looping aspect shown in the examples: internally, > we’ve abstracted this away using our own IterableOperator. > So there, Jarek, you also completely understood correctly what I want to > achieve, but this could be a future proposal and is out of the scope for > AIP-98. > AIP-98 only focusses on the ability to allow the PythonOperator to execute > async callables out of the box as a first-class citizen and have a > BaseAsyncOperator (PythonOperator extends BaseAsyncOperator already in my PR) > so we can later on extend this feature not only on the PythonOperator but to > other operators as well. > > Regarding the more sophisticated looping abstraction I want to achieve as you > state, this would mean dag authors don’t have to think about concurrency, > iteration, or async execution at all. I also understand your earlier point we > discussed about preferring mapped tasks when UI visibility is required. In > our case, we don’t need the same level of UI granularity, so this trade-off > is acceptable for us and in return we have far much better performance, so at > the moment it doesn't offer the same UI monitoring and retry mechanism as the > mapped tasks mechanism does. > > To illustrate, here’s how your example would look using our IterableOperator > (which actually started AIP-88 discussion in the very beginning). > Instead of calling expand, we simply call iterate on the OperatorPartial: > > class TheHookToRun: > async def method_to_run(self, value): > ... > > @task > def lots_of_values(): > return list(range(0, 10000)) > > @task > async def the_task(value): > conn_id = "a_connection" > result = await TheHookToRun(conn_id).method_to_run(val=value) > return result > > the_task.iterate(value=lots_of_values()) # here we use iterate instead of > expand > > In this setup, the async aspects are completely transparent to the user — > everything is handled out of the box by the IterableOperator, and the task > author does not need to reason about event loops or execution semantics like > concurrency at all. The IterableOperator solution is not only applicable for > async callables but also for regular synchronous callables. > > David > > -----Original Message----- > From: Jarek Potiuk <[email protected]> > Sent: 16 December 2025 00:55 > To: [email protected] > Cc: Philippe Gagnon <[email protected]> > Subject: Re: [PROPOSAL] Rethinking deferrable operators, async hooks and > performance in Airflow 3 by supporting native async PythonOperator > > EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze niet > vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel, stuur > deze e-mail als bijlage naar [email protected]<mailto:[email protected]>. > > This is a good point Philippe - that one comment clicked why I had a bit of > struggle with the proposal and maybe this is something that we all think this > proposal is about - but it's not clear without examples. > > So let me rephrase how I understand it and see if we can agree on it - with > examples. > > For me `@task` is always synonymous with single "process" execution - even > when mapped - the single execution of a method in @task clearly starts when > the process starts and when it ends - the process exits. > There is - I think - no particular reason why we should change that. One > `@task` = one process. > > Now - currently, the task is just a simple synchronous callable run by > PythonOperator in the main thread, so yes, indeed you cannot easily execute > async coroutines in it without adding other async methods that you call > explicitly with event loop, because you need to run it within an event loop. > So you can do this now: > > class TheHookToRun: > async def method_to_run(): > > async def async_part_of_the_task(conn_id, array_of_values): > parts = [ TheHookToRun(conn_id).method_to_run(val=value) for value in > array_of_values] > results = await asyncio.gather(*parts) > > @task() > def the_task(array_of_values): > conn_id = "a_connection" > asyncio.run(async_part_of_the_task(conn_id, array_of_values) > > > And I think what we could easily do and **all** that we need is to change the > @task decorated methods to be able to run as coroutine, so conceptually we > want to do this, I think: > > class TheHookToRun: > async def method_to_run(): > > @task() > async def the_task(array_of_values): > conn_id = "a_connection" > parts = [ TheHookToRun(conn_id).method_to_run(val=value) for value in > array_of_values] > results = await asyncio.gather(*parts) > > So - what we want to do is to mark `the_task` as async - using standard > python semantics and make sure that when callable is passed to PythonOperator > is async, we run: > > asyncio.run(callable()) > > instead of > > callable() > > We only need to check if the callable is sync or async (something that for > example fast_api does and we had recent problems with in a recent version > when they got it wrong). > > I think this is basically all we need to achieve what you wanted David. > Maybe (I am not sure) you also wanted to do looping somehow outside of the > `@task` callable, do something more sophisticated here, but IMHO just > explicitly allowing async task-decorated methods and making sure they run in > an event loop addresses exactly what you wanted to achieve. > > But maybe you thought about something else David - and can provide an example > how it would look like in Dag ? > > J. > > > > On Mon, Dec 15, 2025 at 6:09 PM Philippe Gagnon via dev < > [email protected]> wrote: > > > Hi, > > > > > There async tasks will benefit from the multi threading, as they > > > share > > the same event loop and everything is run within the same Celery > > worker, but that's another solution. > > > > I'm not sure that's accurate; every task runs within its own process. > > That being said, I think the ability to run async callables via > > PythonOperator (or any other operator for that matter) is valuable in > > and of itself, and I don't think there would be much pushback if we > > adopted interfaces to facilitate this. WDYT about reducing the scope > > of your proposal (and PR) to this framing, that way it would be easier to > > gain consensus? > > > > (I think that's essentially what you are pitching 🙂) > > > > BR, > > > > *✨ **Philippe Gagnon* > > *Meet for 30 mins 📅* > > <https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcal > > endar.app.google%2F5qzgD9SadybUvSCv8&data=05%7C02%7Cdavid.blain%40infr > > abel.be%7C5b39dc7254d841e56d3f08de3c35adac%7Cb82bc314ab8e4d6fb18946f02 > > e1f27f2%7C0%7C0%7C639014398414722610%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0e > > U1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIld > > UIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=4MuD3ytW04UljU8B9UDilgioTOGrDdXrLkIm0 > > d1A0nU%3D&reserved=0> > > > > > > On Fri, Dec 5, 2025 at 2:49 AM Blain David <[email protected]> > > wrote: > > > > > Hello Jense, > > > > > > Thanks for your time and answer. I just granted you and Zhe-You > > > access > > to > > > the document. > > > > > > In the article I explained why we did the iteration ourselves within > > > the async @task decorated function, as this was way faster than > > > doing it with dynamic task mapping. > > > Not that you cannot use dynamic task mapping with async > > > PythonOperator, > > it > > > just works as with any other operator, it's just doesn't make sense > > > as it won't give you any performance benefits due to the fact that > > > you don't share the same event loop (at least when using the > > > CeleryExecutor). > > > > > > You could for example on big streams of data use dynamic tsk mapping > > > to chunk the stream in multiple pieces and then each task would > > > process the chunk within the async operator for example, a bit like > > > partition if you like. > > > > > > In the example I used for the article, we once again don't care > > > about individual download state of the FTP-file, we just want to > > > know if the directory was successfully downloaded or not, ofc we > > > added some logging statements to show which file was downloaded. > > > I also know Jarek wants individual state tracking, but that' not the > > > solution I presented here, for micro batching we have our > > IterableOperator, > > > which instead of doing partial/expand we do partial/iterate, which > > actually > > > does the same as the for loop of the example in the article but then > > > managed for you in a multi threaded way for sync as async tasks as > > > well. There async tasks will benefit from the multi threading, as > > > they share the same event loop and everything is run within the same > > > Celery worker, but that's another solution. > > > > > > Still with the dynamic task mapping or IterableOperator, you > > > wouldn't be able to use the SFTPClientPool (before name > > > AsyncSFTPConnectionPool as in the article), so you wouldn't benefit > > > of the performance gain you get > > from > > > the pool, that why here in this example, we do the looping > > > ourselves. > > > > > > And I completely agree for triggerers, we also use it a lot, and it > > > is indeed cool for long running tasks in which have lot's of waiting > > > times (dead time), and you're just monitoring a state, that the > > > purpose of triggerers! > > > But with some operators, triggers are misused as they are the "only" > > > way to run async code which returns a lot of data which have to come > > > back to the operator so it can be exposed as an XCom, there you'll > > > see that you trigger table in the Airflow database will explode > > > fast, As each yielded response is being stored in the database, as > > > it can't > > make > > > use of a custom XCom backend like operators do, so in practise, each > > result > > > yielded by the trigger, will first end up in your tirgger table > > > before being stored as an XCom. > > > > > > Also, I'm not telling here to replace anything, I just propose an > > > alternative solution in Airflow so you're not purely tied to > > > deferrable operators. Also at the Summit, we experienced during a > > > lot of presentations, that people tend to use more the > > > PythonOperators (or > > @task) > > > with hook than > > > using the operators themselves, which in my opinion makes sense as > > > you have more flexibility, you still benefit from the Airflow > > > integration > > with > > > Hooks but you aren't tight to the operator implementation, which > > > might offer limited operations, look for example at the SFTPOperator > > > and you'll understand immediately. That's why I propose to allow > > > running async code in PythonOperators natively, that way, you can > > > directly interact with > > async > > > hooks and you don't need a triggerer to do that. Triggers are great > > > for polling and listening, but not for processing huge amounts of > > > data, that's where celery workers shine, thus allowing > > > PythonOperators to natively run async code in you celery workers, you can > > > do so. > > > > > > For you last example, in our case DB calls are still sync, as of my > > > knowledge, we don’t have any DB hook based on DBApiHook which > > > supports async operations? Also the db operation can be seen as > > > another sync > > task, > > > so they don't need to run in the same async task, you just pass the > > > XCom returned from the async task to the sync task. But having > > > async DB hooks could be cool, I also though about it, but this would > > > also depend on the driver if it supports it, still it also something > > > I would like to test in the near future. > > > > > > I hope this answered most of your questions Jens 😉 > > > > > > -----Original Message----- > > > From: Jens Scheffler <[email protected]> > > > Sent: 04 December 2025 21:12 > > > To: [email protected] > > > Subject: Re: [PROPOSAL] Rethinking deferrable operators, async hooks > > > and performance in Airflow 3 by supporting native async > > > PythonOperator > > > > > > EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en > > > deze niet vertrouwt, klik niet op een link of open geen bijlages. > > > Bij twijfel, stuur deze e-mail als bijlage naar [email protected]<mailto: > > > [email protected]>. > > > > > > Requested access to Google doc to read more details. Am interested > > > and also as Daniel what the difference is/would be. > > > > > > Especially also as progress tracking might be important. Yes, Task > > Mapping > > > is very expensive if you want to download 17k XML files, but also > > > when running Async and you are at 5000 files, if you resume would > > > you know > > what > > > was complete or would it start from scratch all times? > > > > > > I think such micro-batching is cool but some state tracking is > > > important > > > - which might if it is in the DB also overload the DB or add very > > > many transactions. > > > > > > Trioggerer though I think still is cool for long running tasks where > > > you just wait for response, e.g. you triggered another job remote or > > > you started a Pod that runs for an hour. We have Pods runnign for > > > 10h > > sometimes > > > and then it is important to be able to roll new SW to workers and > > > with triggerers we cann de-couple this. > > > > > > So maybe - without missing details - I would judge such > > > micro-batching as a third execution option but most probably would not > > > replace the others. > > > > > > Also knowin from own experience, writing async code is more complex > > > and error prone, so if you would request all normal code being async > > > you > > might > > > scare users away. Proper review needed to ensure all IO is async > > > (also DB > > > calls!) > > > > > > On 12/4/25 18:08, Daniel Standish via dev wrote: > > > > Here's what I'm hearing from this > > > > > > > > 1. not using task mapping, but just looping instead, can be much > > > > more efficient. > > > > Yes, of course it can. > > > > > > > > 2. there are ways in which triggerer / deferrable operators are > > > > not fully complete, or do not fully have feature parity with > > > > regular operators (such as the custom xcom backend example) I > > > > believe it. But this could certainly be worked on. > > > > > > > > Question for you: > > > > > > > > How is your proposal different / better than say, just calling > > > > `asyncio.run(...)` in a python task? > > > > > > > > > > > > On Thu, Dec 4, 2025 at 8:38 AM Blain David > > > > <[email protected]> > > > wrote: > > > > > > > >> As I already discussed with Jarek in the past but also with > > > >> Hussein during the Airflow Summit, we at a certain moment > > > >> encountered performance issues when using a lot of deferred operators. > > > >> > > > >> Allowing PythonOperators (and thus also @task decorated methods) > > > >> to natively execute async Python code in Airflow solved our > > > >> performance > > > issues. > > > >> And yes, you could argue if that’s really necessary and also > > > >> what’s the added value? And at first you would indeed think it > > > >> doesn’t make sense at all do so, right? > > > >> But please bear with me first and hear me out first why we did it > > > >> that way and how it solved our performance issues and it will > > > >> become crystal clear 😉 > > > >> So below is the article I wrote, which is also publicly available > > > >> here< https://doc/ > > > >> s.google.com%2Fdocument%2Fd%2F1pNdQUB0gH-r2X1N_g774IOUEurowwQZ5OJ > > > >> 7yiY > > > >> 89qok&data=05%7C02%7Cdavid.blain%40infrabel.be%7Ca90fcc33ab604f91 > > > >> 948f > > > >> 08de33719fc0%7Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C6390047 > > > >> 6028 > > > >> 6869534%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwL > > > >> jAuM > > > >> DAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C > > > >> %7C& > > > >> sdata=%2FNbnXz%2BWTH0WDp8lTic8sraokWDojaNYfr51I2ohy58%3D&reserved > > > >> =0> on Google Docs which makes it easier to read than through > > > >> the > > devlist. > > > >> > > > >> Here is my article: > > > >> > > > >> Rethinking deferrable operators, async hooks and performance in > > > >> Airflow 3 > > > >> > > > >> At our company, we strive to avoid custom code in Airflow as much > > > >> as possible to improve maintainability. > > > >> For years this meant favouring dedicated Airflow operators over > > > >> Python operators. > > > >> However, in Airflow 3, as the number of deferred operators in our > > > >> DAGs continued to grow, we began facing severe performance issues > > > >> with deferrable operators, which forced us to re-evaluate that > > approach. > > > >> > > > >> Initially we expected deferrable operators to improve performance > > > >> for I/O-related tasks—such as REST API calls—because triggerers > > > >> follow an async producer/consumer pattern. But in practice we > > > >> discovered the > > > opposite. > > > >> > > > >> Why Deferrable Operators Became the Bottleneck? > > > >> > > > >> Deferrable operators and sensors delegate async work to triggerers. > > > >> This is perfectly fine for lightweight tasks such as polling or > > > >> waiting for messages on a queue. > > > >> > > > >> But in our case: > > > >> > > > >> > > > >> * MSGraphAsyncOperator performs long-running async operations. > > > >> * HttpOperator in deferrable mode can perform long-running HTTP > > > >> interactions, especially if pagination is involved. > > > >> * There is no native deferrable SFTPOperator, so if we want to > > use > > > the > > > >> SFTPHookAsync, we must use the PythonOperator which natively > > > >> doesn’t support async code (not that big of challenge). > > > >> * Both can return large payloads. > > > >> * Triggerers must store yielded events directly into the Airflow > > > >> metadata database. > > > >> > > > >> Triggerers are not designed for sustained high-load async > > > >> execution or large data transfers. Unlike Celery workers, > > > >> triggerers scale poorly and quickly become the bottleneck. > > > >> > > > >> Yielded events from triggers are stored directly in the Airflow > > > >> metadata database because, unlike workers, triggers cannot > > > >> leverage a custom XCom backend to offload large payloads, which > > > >> can lead to increased database load and potential performance > > > >> bottlenecks. > > > >> > > > >> Dynamic task mapping with deferrable operators amplifies the > > > >> problem even further which AIP‑88 partially solves. > > > >> Triggerers also cannot be run on the Edge Executor as triggerers > > > >> are still tightly coupled with the Airflow metadata database > > > >> (possibly addressed in AIP‑92). > > > >> > > > >> Rethinking the approach: Async hooks + Python tasks > > > >> > > > >> These limitations led us to reconsider calling async hooks > > > >> directly from Python @task decorated functions or > > > >> PythonOperators, thus avoiding deferrable operators and thus > > > >> triggerers entirely. > > > >> Operators are wrappers around hooks. Well‑written operators > > > >> should contain little logic and delegate all the work to the > > > >> hooks which do the real work,so why not call them directly? > > > >> This idea is also a bit in line with what Bolke already > > > >> presented< > > > >> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2 > > > >> Fairflowsummit.org%2Fslides%2F2023%2Fab1-1400-Operators.pdf&data= > > > >> 05%7C02%7Cdavid.blain%40infrabel.be%7C5b39dc7254d841e56d3f08de3c3 > > > >> 5adac%7Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C63901439841474 > > > >> 7203%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAu > > > >> MDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7 > > > >> C%7C&sdata=yxB9i4xCbGvDpYZ4VY3UDiRIpUe4pBqlL1xYFixKoXw%3D&reserve > > > >> d=0> in > > 2023. > > > >> > > > >> Advantages of this approach include: > > > >> > > > >> > > > >> * No dynamic task mapping needed when iterating—just loop in > > > Python, > > > >> unless you really need to track each individual step but that > > > >> comes with a cost. > > > >> * Massive reduction in scheduler load. > > > >> * No triggerers involved. > > > >> * Async code can run on Edge Workers. > > > >> * Celery workers scale far much better than triggerers, so by > > > moving > > > >> from deferred operators and thus triggerers to async operators on > > > >> celery workers, our performance issues on the triggerer were gone > > > >> and run times were much shorter probably because the trigger > > > >> mechanism also puts more load on the scheduler. > > > >> * Sync or async doesn’t make any difference in performance, > > unless > > > you > > > >> have to execute the same async function multiple times, that’s > > > >> when async shines compared to sync especially with I/O related > > > >> operations. > > > >> > > > >> Concrete Example: Async SFTP Downloads > > > >> > > > >> Below is an example comparing the download of ~17,000 XML-files > > > >> and storing into our Datawarehouse. > > > >> A single Celery worker can orchestrate many concurrent downloads > > > >> using asyncio. > > > >> A semaphore (here used internally by the AsyncSFTPConnectionPool) > > > >> protects the SFTP server from being overloaded. > > > >> Benchmark results: > > > >> > > > >> Approach > > > >> Environment Time > > > >> Mapped SFTPOperator > > > >> production 3h 25m 55s > > > >> PythonOperator + SFTPHook > > > >> local laptop 1h 21m 09s > > > >> Async Python task + SFTPHookAsync (without pool) local laptop > > > >> 8m 29s > > > >> Async Python task + AsyncSFTPConnectionPool production > > > >> 3m32s > > > >> > > > >> As you all can conclude, DagRun time went down from more than 3 > > > >> hours to only 3 minutes and a half, which is huge! > > > >> > > > >> In the google docs there are 2 different code snippets on how > > > >> it’s done sync and async which I will not put here. > > > >> > > > >> Conclusion > > > >> > > > >> Using async hooks inside async Python tasks provides better > > > >> performance, scalability, and flexibility, and avoids reliance on > > > triggerers entirely. > > > >> This hybrid approach—'async where it matters, operators where > > > >> they make sense'—may represent the future of high‑performance > > > >> Airflow data processing workloads. > > > >> > > > >> What did I change in Airflow? > > > >> > > > >> Not that much, I only: > > > >> > > > >> > > > >> * Introduced an async PythonOperator so you don’t have to handle > > > the > > > >> event loop yourself, not that special, but also natively > > > >> supported on async @task decorated python methods, which is nice to > > > >> read. > > > >> * Did some improvements on the SFTPHookAsync to fully take > > > advantage > > > >> of the async. > > > >> * Introduced a SFTPHookPool so multiple asyncio tasks can re-use > > > >> connection instance to gain even more performance, in this case > > > >> it meant a reduction of 5 minutes in processing time, so we went > > > >> from 8 > > to > > > 3 minutes. > > > >> > > > >> > > > >> > > > > > > -------------------------------------------------------------------- > > > - To unsubscribe, e-mail: [email protected] > > > For additional commands, e-mail: [email protected] > > > > > > > > > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
