With the limited scope of it (I think the original proposal looked quite a
bit more than it really is) - I really like the idea. It grew with me over
time. Thanks David for being so persistent and diligent in explaining the
context and reasoning. I finally got the hang of it.

I think currently, when we do not "advertise" the possibility of
running any code - not only hooks - asynchronously people might not even
realise that they can do something like described above, and
performance-wise I think there is a huge boost for a number of workflows.

If we handle `async def the_task` with starting and managing the async loop
under the hood and have a nice documentation about it, it might spark ideas
for people to actually use it.

Handling the loop on our end is - I think - better, simply because we can
do other things that we will find useful - and some of our internal utils
and constructs might make use of that - async connection and variable
retrievals that we already use for Triggers, but likely some more things
that we will find useful will get async as first-class-citizen - simply
because it will be natural to use them in such async tasks. Currently we
might not even think of some of those things but I am sure we will find
quite a few things. Also we can add such event loop management in task
lifecycle - i am quite sure that with some of the "cross cutting" concerns
we have - like open lineage, being able to know when the  event loop starts
and when it ends and even being able to monitor it might be very useful -
so having "our" event loop seems like a good idea rather than leaving it to
the user to manage it.

Also - what it opens up - is an interesting opportunity. We discussed it
with David before. Ideally we would like to still treat such iterative
tasks like mapped tasks and show their progress in the UI of Airflow. This
would be heavy penalty on performance if we do it in naive way, however -
being able to monitor and query the "common" event loop opens up an
interesting opportunity - we could eventually implement some light-wait API
to do it and use the newer Python version features which increased
query-ability and introspection of the async tasks, For example in Python
3.14 there are new tools added [1] to do a deep introspection of async
tasks. We could make use of asyncio TaskGroups as well and expose
deep-status of such async tasks via Airflow UI.

Imagine the task from David's example - where in the Airflow UI we could
see the %progress of such SFTP, list of errors, etc, etc. in a structured
way - not only as logs. That sounds like a very exciting opportunity. And
not needed now - but making `async tasks` first class citizens, we could do
something like that in the future - fairly easily I'd say.

Also we can indeed come up with some common tooling - like IterableOperator
- where documentation and examples might spark people to use more of the
async functionality - especially when they seek performance and
concurrency, implement some common patterns that will allow to - indeed
like David wrote - finally use Hooks as the building blocks of our
workflows, rather than Operators.

I am sure Bolke might have something to say because it goes hand-in-hand
with his "click-baity' talk from previous Summit - "Operators need to die"
[2]

Also to Dennis:

```
@task
async def the_task
``

This is absolutely the most pythonic way of expressing the intent here.
This is also the pattern that we already have from fast-api. Fast API
already does pretty much the same - they decide how to handle "handlers" of
ours - depending if they are defined as async or not - same decorators are
applied to async and sync methods - it's the framework that decides on how
to wrap their execution. This has proven to be both - intuitive and working
well, I think we should follow the suit rather than introduce a new
decorator type. Also as David wrote - we can further extend that to other
types of decorators.

While async @task.bash is likely somewhat strange, it does not seem as
off-the-chart for `async @task.docker` or `async @task.kubernetes` - it
would require a bit different handling (asyncio loop would have to start
inside the containers/pods but I very easily imagine someone trying to
write such async methods for those - and pretty much any other task
decorator.

[1]
https://docs.python.org/3/whatsnew/3.14.html#whatsnew314-asyncio-introspection
[2] https://www.youtube.com/watch?v=J5pbH1TUv0U

J


On Wed, Dec 17, 2025 at 2:24 AM Dennis Ferruzzi <[email protected]> wrote:

> I like the proposal overall and may be bikeshedding this - and if so I do
> apologize - but I feel it should be more explicit, either @async-task or
> @task.async (to match the existing @task.batch pattern)... but I don't feel
> particularly strongly about that.
>
>  - ferruzzi
>
> On 2025/12/16 19:30:26 Blain David wrote:
> > Hello Jarek, you completely nailed it with your explanation, that's
> indeed what I wanted to achieve.
> >
> > And yes, as Daniel also suggested, you could achieve the same by just
> wrapping the async code in an async method and using asyncio.run() in a
> regular PythonOperator task (like in your first example).  But as I
> explained, that would have certainly worked in Airflow 2 but not in 3, due
> to Task SDK.  Of course this also depends on how the async Hook operations
> are implemented.
> > Besides that, my main goal is simply to be able to execute async
> callables with the PythonOperator without having to worry about event-loop
> management.
> >
> > In the PR, the PythonOperator explicitly detects whether the provided
> callable is synchronous or asynchronous and chooses the correct execution
> path accordingly. The same applies to @task-decorated functions: when a
> task is defined as async, it is executed as a coroutine. This logic is
> centralized entirely in the PythonOperator; the decorators merely delegate
> to it. As a result, this behavior is fully covered by the PR and
> demonstrated in the examples included in the AIP proposal.
> >
> > Regarding the iteration / looping aspect shown in the examples:
> internally, we’ve abstracted this away using our own IterableOperator.
> > So there, Jarek, you also completely understood correctly what I want to
> achieve, but this could be a future proposal and is out of the scope for
> AIP-98.
> > AIP-98 only focusses on the ability to allow the PythonOperator to
> execute async callables out of the box as a first-class citizen and have a
> BaseAsyncOperator (PythonOperator extends BaseAsyncOperator already in my
> PR) so we can later on extend this feature not only on the PythonOperator
> but to other operators as well.
> >
> > Regarding the more sophisticated looping abstraction I want to achieve
> as you state, this would mean dag authors don’t have to think about
> concurrency, iteration, or async execution at all. I also understand your
> earlier point we discussed about preferring mapped tasks when UI visibility
> is required. In our case, we don’t need the same level of UI granularity,
> so this trade-off is acceptable for us and in return we have far much
> better performance, so at the moment it doesn't offer the same UI
> monitoring and retry mechanism as the mapped tasks mechanism does.
> >
> > To illustrate, here’s how your example would look using our
> IterableOperator (which actually started AIP-88 discussion in the very
> beginning).
> > Instead of calling expand, we simply call iterate on the OperatorPartial:
> >
> > class TheHookToRun:
> >     async def method_to_run(self, value):
> >         ...
> >
> > @task
> > def lots_of_values():
> >     return list(range(0, 10000))
> >
> > @task
> > async def the_task(value):
> >     conn_id = "a_connection"
> >     result = await TheHookToRun(conn_id).method_to_run(val=value)
> >     return result
> >
> > the_task.iterate(value=lots_of_values())  # here we use iterate instead
> of expand
> >
> > In this setup, the async aspects are completely transparent to the user
> — everything is handled out of the box by the IterableOperator, and the
> task author does not need to reason about event loops or execution
> semantics like concurrency at all.  The IterableOperator solution is not
> only applicable for async callables but also for regular synchronous
> callables.
> >
> > David
> >
> > -----Original Message-----
> > From: Jarek Potiuk <[email protected]>
> > Sent: 16 December 2025 00:55
> > To: [email protected]
> > Cc: Philippe Gagnon <[email protected]>
> > Subject: Re: [PROPOSAL] Rethinking deferrable operators, async hooks and
> performance in Airflow 3 by supporting native async PythonOperator
> >
> > EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze
> niet vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel,
> stuur deze e-mail als bijlage naar [email protected]<mailto:
> [email protected]>.
> >
> > This is a good point Philippe - that one comment clicked why I had a bit
> of struggle with the proposal and maybe this is something that we all think
> this proposal is about - but it's not clear without examples.
> >
> > So let me rephrase how I understand it and see if we can agree on it -
> with examples.
> >
> > For me `@task` is always synonymous with single "process" execution -
> even when mapped - the single execution of a method in @task clearly starts
> when the process starts and when it ends - the process exits.
> > There is - I think - no particular reason why we should change that. One
> `@task` = one process.
> >
> > Now - currently, the task is just a simple synchronous callable run by
> PythonOperator in the main thread, so yes, indeed you cannot easily execute
> async coroutines in it without adding other async methods that you call
> explicitly with event loop, because you need to run it within an event
> loop. So you can do this now:
> >
> > class TheHookToRun:
> >        async def method_to_run():
> >
> > async def async_part_of_the_task(conn_id, array_of_values):
> >     parts = [ TheHookToRun(conn_id).method_to_run(val=value) for value
> in array_of_values]
> >     results = await asyncio.gather(*parts)
> >
> > @task()
> > def the_task(array_of_values):
> >     conn_id = "a_connection"
> >         asyncio.run(async_part_of_the_task(conn_id, array_of_values)
> >
> >
> > And I think what we could easily do and **all** that we need is to
> change the @task decorated methods to be able to run as coroutine, so
> conceptually we want to do this, I think:
> >
> > class TheHookToRun:
> >        async def method_to_run():
> >
> > @task()
> > async def the_task(array_of_values):
> >     conn_id = "a_connection"
> >     parts = [ TheHookToRun(conn_id).method_to_run(val=value) for value
> in array_of_values]
> >     results = await asyncio.gather(*parts)
> >
> > So - what we want to do is to mark `the_task` as async - using standard
> python semantics and make sure that when callable is passed to
> PythonOperator is async, we run:
> >
> > asyncio.run(callable())
> >
> > instead of
> >
> > callable()
> >
> > We only need to check if the callable is sync or async (something that
> for example fast_api does and we had recent problems with in a recent
> version when they got it wrong).
> >
> > I think this is basically all we need to achieve what you wanted David.
> > Maybe (I am not sure) you also wanted to do looping  somehow outside of
> the `@task` callable, do something more sophisticated here, but IMHO just
> explicitly allowing async task-decorated methods and making sure they run
> in an event loop addresses exactly what you wanted to achieve.
> >
> > But maybe you thought about something else David - and can provide an
> example how it would look like in Dag ?
> >
> > J.
> >
> >
> >
> > On Mon, Dec 15, 2025 at 6:09 PM Philippe Gagnon via dev <
> [email protected]> wrote:
> >
> > > Hi,
> > >
> > > > There async tasks will benefit from the multi threading, as they
> > > > share
> > > the same event loop and everything is run within the same Celery
> > > worker, but that's another solution.
> > >
> > > I'm not sure that's accurate; every task runs within its own process.
> > > That being said, I think the ability to run async callables via
> > > PythonOperator (or any other operator for that matter) is valuable in
> > > and of itself, and I don't think there would be much pushback if we
> > > adopted interfaces to facilitate this. WDYT about reducing the scope
> > > of your proposal (and PR) to this framing, that way it would be easier
> to gain consensus?
> > >
> > > (I think that's essentially what you are pitching 🙂)
> > >
> > > BR,
> > >
> > > *✨ **Philippe Gagnon*
> > > *Meet for 30 mins 📅*
> > > <https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcal
> > > endar.app.google%2F5qzgD9SadybUvSCv8&data=05%7C02%7Cdavid.blain%40infr
> > > abel.be%7C5b39dc7254d841e56d3f08de3c35adac%7Cb82bc314ab8e4d6fb18946f02
> > > e1f27f2%7C0%7C0%7C639014398414722610%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0e
> > > U1hcGkiOnRydWUsIlYiOiIwLjAuMDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIld
> > > UIjoyfQ%3D%3D%7C0%7C%7C%7C&sdata=4MuD3ytW04UljU8B9UDilgioTOGrDdXrLkIm0
> > > d1A0nU%3D&reserved=0>
> > >
> > >
> > > On Fri, Dec 5, 2025 at 2:49 AM Blain David <[email protected]>
> > > wrote:
> > >
> > > > Hello Jense,
> > > >
> > > > Thanks for your time and answer.  I just granted you and Zhe-You
> > > > access
> > > to
> > > > the document.
> > > >
> > > > In the article I explained why we did the iteration ourselves within
> > > > the async @task decorated function, as this was way faster than
> > > > doing it with dynamic task mapping.
> > > > Not that you cannot use dynamic task mapping with async
> > > > PythonOperator,
> > > it
> > > > just works as with any other operator, it's just doesn't make sense
> > > > as it won't give you any performance benefits due to the fact that
> > > > you don't share the same event loop (at least when using the
> CeleryExecutor).
> > > >
> > > > You could for example on big streams of data use dynamic tsk mapping
> > > > to chunk the stream in multiple pieces and then each task would
> > > > process the chunk within the async operator for example, a bit like
> > > > partition if you like.
> > > >
> > > > In the example I used for the article, we once again don't care
> > > > about individual download state of the FTP-file, we just want to
> > > > know if the directory was successfully downloaded or not, ofc we
> > > > added some logging statements to show which file was downloaded.
> > > > I also know Jarek wants individual state tracking, but that' not the
> > > > solution I presented here, for micro batching we have our
> > > IterableOperator,
> > > > which instead of doing partial/expand we do partial/iterate, which
> > > actually
> > > > does the same as the for loop of the example in the article but then
> > > > managed for you in a multi threaded way for sync as async tasks as
> > > > well.  There async tasks will benefit from the multi threading, as
> > > > they share the same event loop and everything is run within the same
> > > > Celery worker, but that's another solution.
> > > >
> > > > Still with the dynamic task mapping or IterableOperator, you
> > > > wouldn't be able to use the SFTPClientPool (before name
> > > > AsyncSFTPConnectionPool as in the article), so you wouldn't benefit
> > > > of the performance gain you get
> > > from
> > > > the pool, that why here in this example, we do the looping
> > > > ourselves.
> > > >
> > > > And I completely agree for triggerers, we also use it a lot, and it
> > > > is indeed cool for long running tasks in which have lot's of waiting
> > > > times (dead time), and you're just monitoring a state, that the
> > > > purpose of triggerers!
> > > > But with some operators, triggers are misused as they are the "only"
> > > > way to run async code which returns a lot of data which have to come
> > > > back to the operator so it can be exposed as an XCom, there you'll
> > > > see that you trigger table in the Airflow database will explode
> > > > fast, As each yielded response is being stored in the database, as
> > > > it can't
> > > make
> > > > use of a custom XCom backend like operators do, so in practise, each
> > > result
> > > > yielded by the trigger, will first end up in your tirgger table
> > > > before being stored as an XCom.
> > > >
> > > > Also, I'm not telling here to replace anything, I just propose an
> > > > alternative solution in Airflow so you're not purely tied to
> > > > deferrable operators.  Also at the Summit, we experienced during a
> > > > lot of presentations, that people tend to use more the
> > > > PythonOperators (or
> > > @task)
> > > > with hook than
> > > > using the operators themselves, which in my opinion makes sense as
> > > > you have more flexibility, you still benefit from the Airflow
> > > > integration
> > > with
> > > > Hooks but you aren't tight to the operator implementation, which
> > > > might offer limited operations, look for example at the SFTPOperator
> > > > and you'll understand immediately.  That's why I propose to allow
> > > > running async code in PythonOperators natively, that way, you can
> > > > directly interact with
> > > async
> > > > hooks and you don't need a triggerer to do that.  Triggers are great
> > > > for polling and listening, but not for processing huge amounts of
> > > > data, that's where celery workers shine, thus allowing
> > > > PythonOperators to natively run async code in you celery workers,
> you can do so.
> > > >
> > > > For you last example, in our case DB calls are still sync, as of my
> > > > knowledge, we don’t have any DB hook based on DBApiHook which
> > > > supports async operations?  Also the db operation can be seen as
> > > > another sync
> > > task,
> > > > so they don't need to run in the same async task, you just pass the
> > > > XCom returned from the async task to the sync task.  But having
> > > > async DB hooks could be cool, I also though about it, but this would
> > > > also depend on the driver if it supports it, still it also something
> > > > I would like to test in the near future.
> > > >
> > > > I hope this answered most of your questions Jens 😉
> > > >
> > > > -----Original Message-----
> > > > From: Jens Scheffler <[email protected]>
> > > > Sent: 04 December 2025 21:12
> > > > To: [email protected]
> > > > Subject: Re: [PROPOSAL] Rethinking deferrable operators, async hooks
> > > > and performance in Airflow 3 by supporting native async
> > > > PythonOperator
> > > >
> > > > EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en
> > > > deze niet vertrouwt, klik niet op een link of open geen bijlages.
> > > > Bij twijfel, stuur deze e-mail als bijlage naar [email protected]
> <mailto:
> > > > [email protected]>.
> > > >
> > > > Requested access to Google doc to read more details. Am interested
> > > > and also as Daniel what the difference is/would be.
> > > >
> > > > Especially also as progress tracking might be important. Yes, Task
> > > Mapping
> > > > is very expensive if you want to download 17k XML files, but also
> > > > when running Async and you are at 5000 files, if you resume would
> > > > you know
> > > what
> > > > was complete or would it start from scratch all times?
> > > >
> > > > I think such micro-batching is cool but some state tracking is
> > > > important
> > > > - which might if it is in the DB also overload the DB or add very
> > > > many transactions.
> > > >
> > > > Trioggerer though I think still is cool for long running tasks where
> > > > you just wait for response, e.g. you triggered another job remote or
> > > > you started a Pod that runs for an hour. We have Pods runnign for
> > > > 10h
> > > sometimes
> > > > and then it is important to be able to roll new SW to workers and
> > > > with triggerers we cann de-couple this.
> > > >
> > > > So maybe - without missing details - I would judge such
> > > > micro-batching as a third execution option but most probably would
> not replace the others.
> > > >
> > > > Also knowin from own experience, writing async code is more complex
> > > > and error prone, so if you would request all normal code being async
> > > > you
> > > might
> > > > scare users away. Proper review needed to ensure all IO is async
> > > > (also DB
> > > > calls!)
> > > >
> > > > On 12/4/25 18:08, Daniel Standish via dev wrote:
> > > > > Here's what I'm hearing from this
> > > > >
> > > > > 1. not using task mapping, but just looping instead, can be much
> > > > > more efficient.
> > > > > Yes, of course it can.
> > > > >
> > > > > 2. there are ways in which triggerer / deferrable operators are
> > > > > not fully complete, or do not fully have feature parity with
> > > > > regular operators (such as the custom xcom backend example) I
> > > > > believe it.  But this could certainly be worked on.
> > > > >
> > > > > Question for you:
> > > > >
> > > > > How is your proposal different / better than say, just calling
> > > > > `asyncio.run(...)` in a python task?
> > > > >
> > > > >
> > > > > On Thu, Dec 4, 2025 at 8:38 AM Blain David
> > > > > <[email protected]>
> > > > wrote:
> > > > >
> > > > >> As I already discussed with Jarek in the past but also with
> > > > >> Hussein during the Airflow Summit, we at a certain moment
> > > > >> encountered performance issues when using a lot of deferred
> operators.
> > > > >>
> > > > >> Allowing PythonOperators (and thus also @task decorated methods)
> > > > >> to natively execute async Python code in Airflow solved our
> > > > >> performance
> > > > issues.
> > > > >> And yes, you could argue if that’s really necessary and also
> > > > >> what’s the added value? And at first you would indeed think it
> > > > >> doesn’t make sense at all do so, right?
> > > > >> But please bear with me first and hear me out first why we did it
> > > > >> that way and how it solved our performance issues and it will
> > > > >> become crystal clear 😉
> > > > >> So below is the article I wrote, which is also publicly available
> > > > >> here< https://doc/
> > > > >> s.google.com%2Fdocument%2Fd%2F1pNdQUB0gH-r2X1N_g774IOUEurowwQZ5OJ
> > > > >> 7yiY
> > > > >> 89qok&data=05%7C02%7Cdavid.blain%40infrabel.be%7Ca90fcc33ab604f91
> > > > >> 948f
> > > > >> 08de33719fc0%7Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C6390047
> > > > >> 6028
> > > > >> 6869534%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwL
> > > > >> jAuM
> > > > >> DAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7C
> > > > >> %7C&
> > > > >> sdata=%2FNbnXz%2BWTH0WDp8lTic8sraokWDojaNYfr51I2ohy58%3D&reserved
> > > > >> =0> on Google Docs  which makes it easier to read than through
> > > > >> the
> > > devlist.
> > > > >>
> > > > >> Here is my article:
> > > > >>
> > > > >> Rethinking deferrable operators, async hooks and performance in
> > > > >> Airflow 3
> > > > >>
> > > > >> At our company, we strive to avoid custom code in Airflow as much
> > > > >> as possible to improve maintainability.
> > > > >> For years this meant favouring dedicated Airflow operators over
> > > > >> Python operators.
> > > > >> However, in Airflow 3, as the number of deferred operators in our
> > > > >> DAGs continued to grow, we began facing severe performance issues
> > > > >> with deferrable operators, which forced us to re-evaluate that
> > > approach.
> > > > >>
> > > > >> Initially we expected deferrable operators to improve performance
> > > > >> for I/O-related tasks—such as REST API calls—because triggerers
> > > > >> follow an async producer/consumer pattern. But in practice we
> > > > >> discovered the
> > > > opposite.
> > > > >>
> > > > >> Why Deferrable Operators Became the Bottleneck?
> > > > >>
> > > > >> Deferrable operators and sensors delegate async work to
> triggerers.
> > > > >> This is perfectly fine for lightweight tasks such as polling or
> > > > >> waiting for messages on a queue.
> > > > >>
> > > > >> But in our case:
> > > > >>
> > > > >>
> > > > >>    *   MSGraphAsyncOperator performs long-running async
> operations.
> > > > >>    *   HttpOperator in deferrable mode can perform long-running
> HTTP
> > > > >> interactions, especially if pagination is involved.
> > > > >>    *   There is no native deferrable SFTPOperator, so if we want
> to
> > > use
> > > > the
> > > > >> SFTPHookAsync, we must use the PythonOperator which natively
> > > > >> doesn’t support async code (not that big of challenge).
> > > > >>    *   Both can return large payloads.
> > > > >>    *   Triggerers must store yielded events directly into the
> Airflow
> > > > >> metadata database.
> > > > >>
> > > > >> Triggerers are not designed for sustained high-load async
> > > > >> execution or large data transfers. Unlike Celery workers,
> > > > >> triggerers scale poorly and quickly become the bottleneck.
> > > > >>
> > > > >> Yielded events from triggers are stored directly in the Airflow
> > > > >> metadata database because, unlike workers, triggers cannot
> > > > >> leverage a custom XCom backend to offload large payloads, which
> > > > >> can lead to increased database load and potential performance
> bottlenecks.
> > > > >>
> > > > >> Dynamic task mapping with deferrable operators amplifies the
> > > > >> problem even further which AIP‑88 partially solves.
> > > > >> Triggerers also cannot be run on the Edge Executor as triggerers
> > > > >> are still tightly coupled with the Airflow metadata database
> > > > >> (possibly addressed in AIP‑92).
> > > > >>
> > > > >> Rethinking the approach: Async hooks + Python tasks
> > > > >>
> > > > >> These limitations led us to reconsider calling async hooks
> > > > >> directly from Python @task decorated functions or
> > > > >> PythonOperators, thus avoiding deferrable operators and thus
> triggerers entirely.
> > > > >> Operators are wrappers around hooks. Well‑written operators
> > > > >> should contain little logic and delegate all the work to the
> > > > >> hooks which do the real work,so  why not call them directly?
> > > > >> This idea is also a bit in line with what Bolke already
> > > > >> presented<
> > > > >> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2
> > > > >> Fairflowsummit.org%2Fslides%2F2023%2Fab1-1400-Operators.pdf&data=
> > > > >> 05%7C02%7Cdavid.blain%40infrabel.be%7C5b39dc7254d841e56d3f08de3c3
> > > > >> 5adac%7Cb82bc314ab8e4d6fb18946f02e1f27f2%7C0%7C0%7C63901439841474
> > > > >> 7203%7CUnknown%7CTWFpbGZsb3d8eyJFbXB0eU1hcGkiOnRydWUsIlYiOiIwLjAu
> > > > >> MDAwMCIsIlAiOiJXaW4zMiIsIkFOIjoiTWFpbCIsIldUIjoyfQ%3D%3D%7C0%7C%7
> > > > >> C%7C&sdata=yxB9i4xCbGvDpYZ4VY3UDiRIpUe4pBqlL1xYFixKoXw%3D&reserve
> > > > >> d=0> in
> > > 2023.
> > > > >>
> > > > >> Advantages of this approach include:
> > > > >>
> > > > >>
> > > > >>    *   No dynamic task mapping needed when iterating—just loop in
> > > > Python,
> > > > >> unless you really need to track each individual step but that
> > > > >> comes with a cost.
> > > > >>    *   Massive reduction in scheduler load.
> > > > >>    *   No triggerers involved.
> > > > >>    *   Async code can run on Edge Workers.
> > > > >>    *   Celery workers scale far much better than triggerers, so by
> > > > moving
> > > > >> from deferred operators and thus triggerers to async operators on
> > > > >> celery workers, our performance issues on the triggerer were gone
> > > > >> and run times were much shorter probably because the trigger
> > > > >> mechanism also puts more load on the scheduler.
> > > > >>    *   Sync or async doesn’t make any difference in performance,
> > > unless
> > > > you
> > > > >> have to execute the same async function multiple times, that’s
> > > > >> when async shines compared to sync especially with I/O related
> operations.
> > > > >>
> > > > >> Concrete Example: Async SFTP Downloads
> > > > >>
> > > > >> Below is an example comparing the download of ~17,000 XML-files
> > > > >> and storing into our Datawarehouse.
> > > > >> A single Celery worker can orchestrate many concurrent downloads
> > > > >> using asyncio.
> > > > >> A semaphore (here used internally by the AsyncSFTPConnectionPool)
> > > > >> protects the SFTP server from being overloaded.
> > > > >> Benchmark results:
> > > > >>
> > > > >> Approach
> > > > >>                              Environment                    Time
> > > > >> Mapped SFTPOperator
> > > > >>             production                       3h 25m 55s
> > > > >> PythonOperator + SFTPHook
> > > > >>       local laptop                     1h 21m 09s
> > > > >> Async Python task + SFTPHookAsync (without pool)       local
> laptop
> > > > >>               8m 29s
> > > > >> Async Python task + AsyncSFTPConnectionPool
> production
> > > > >>                 3m32s
> > > > >>
> > > > >> As you all can conclude, DagRun time went down from more than 3
> > > > >> hours to only 3 minutes and a half, which is huge!
> > > > >>
> > > > >> In the google docs there are 2 different code snippets on how
> > > > >> it’s done sync and async which I will not put here.
> > > > >>
> > > > >> Conclusion
> > > > >>
> > > > >> Using async hooks inside async Python tasks provides better
> > > > >> performance, scalability, and flexibility, and avoids reliance on
> > > > triggerers entirely.
> > > > >> This hybrid approach—'async where it matters, operators where
> > > > >> they make sense'—may represent the future of high‑performance
> > > > >> Airflow data processing workloads.
> > > > >>
> > > > >> What did I change in Airflow?
> > > > >>
> > > > >> Not that much, I only:
> > > > >>
> > > > >>
> > > > >>    *   Introduced an async PythonOperator so you don’t have to
> handle
> > > > the
> > > > >> event loop yourself, not that special, but also natively
> > > > >> supported on async @task decorated python methods, which is nice
> to read.
> > > > >>    *   Did some improvements on the SFTPHookAsync to fully take
> > > > advantage
> > > > >> of the async.
> > > > >>    *   Introduced a SFTPHookPool so multiple asyncio tasks can
> re-use
> > > > >> connection instance to gain even more performance, in this case
> > > > >> it meant a reduction of 5 minutes in processing time, so we went
> > > > >> from 8
> > > to
> > > > 3 minutes.
> > > > >>
> > > > >>
> > > > >>
> > > >
> > > > --------------------------------------------------------------------
> > > > - To unsubscribe, e-mail: [email protected]
> > > > For additional commands, e-mail: [email protected]
> > > >
> > > >
> > >
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to