Ah that makes sense thank you.

I've not tried 1000s (isn't the default limit 1000 tasks per triggerer 
process?) but I have not seen any performance issues yet on our vms scaling up 
yet, tested up to nearly 100 AsyncSqlSensors plus 200 other sensors.

Damian
________________________________
From: Hussein Awala <huss...@awala.fr>
Sent: Wednesday, April 10, 2024 5:44:48 PM
To: dev@airflow.apache.org <dev@airflow.apache.org>
Subject: Re: [DISCUSS] Asynchronous SQLAlchemy

What you propose is exactly what `sync_to_async` does implicitly (check source
code
<https://github.com/django/asgiref/blob/db3ff43e9fa1daf7c6b1cb67db8eec1885be911d/asgiref/sync.py#L164-L254>),
and as we mentioned before, this solution would work, but coroutines are
lightweight compared to threads, easy to manage (suspend, resume, cancel),
and have better exception handling support. The trigger is supposed to
handle thousands of triggers, which will be a big challenge if we use
threads.

On Wed, Apr 10, 2024 at 5:16 PM Damian Shaw <ds...@striketechnologies.com>
wrote:

> I know this is a different use case, but I maintain an internal Airflow
> provider that creates a SQLAlchemy connection type so I can create an
> AsyncSqlSensor that uses SQLAlchemy to manage the connections in the
> triggerer, (I keep a global engine per connection id so that there can be a
> global connection pool, and great SqlAlchemy feature like pre pool ping and
> pool recycle). I was going to try to discuss it with Jarek next month in
> NYC if he had a moment.
>
> For non-async drivers I pass the engine to a thread via a future thread
> pool executor, and then I wrap that future with asyncio. It works
> surprisingly well but does have an edge case around certain drivers calling
> stored procs.
>
> Here's the skeleton of the code:
>     async def run(self):
>         engine = self._get_engine()
>         while True:
>             with ThreadPoolExecutor(max_workers=1) as executor:
>                 future = executor.submit(self._get_records, engine)
>
>                 records = await asyncio.wrap_future(future)
>                 if self.success(records=records):
>                     yield TriggerEvent(True)
>             await asyncio.sleep(self.poke_interval)
>
>     def _get_records(self, engine: Engine) -> Sequence[Sequence[Any]] |
> None:
>         Session = scoped_session(
>             sessionmaker(autocommit=False, autoflush=False, bind=engine)
>         )
>
>         try:
>             session = Session()
>             cursor = session.execute(text(self.sql), self.parameters)
>             result = list(cursor.fetchall())
>         finally:
>             Session.remove()
>
>         return result
>
> I use scoped session for thread safety, as I found some edge cases where
> that helped.
>
> I imagine this approach would work in a lot of cases where only a
> synchronous driver can be used but you're working in an asynchronous
> context, hope this idea is helpful.
>
> Damian
>
>
> -----Original Message-----
> From: Amogh Desai <amoghdesai....@gmail.com>
> Sent: Wednesday, April 10, 2024 12:33 AM
> To: dev@airflow.apache.org
> Subject: Re: [DISCUSS] Asynchronous SQLAlchemy
>
> Thanks, Hussein and Ash, I am much clearer on the scope now and I am ok
> with the discussion going on in the thread so far.
>
>
> Thanks & Regards,
> Amogh Desai
>
>
> On Tue, Apr 9, 2024 at 2:25 AM Andrey Anshin <andrey.ans...@taragol.is>
> wrote:
>
> > If I do not miss something, usage of DB is not covered by Airflow
> > Public Interface, in this case we could easily replace one-by-one sync
> > methods by async.
> > There is some places exists where it might be mixin, as mentioned
> > before Secrets Backend, but it could be done by wrapping it into the:
> > - sync_to_async
> > - asyncio.to_thread (Python 3.9+)
> > - One of the anyio capabilities to run sync code into the async
> > (threads or
> > processes)
> >
> >
> >
> > On Tue, 9 Apr 2024 at 00:45, Daniel Imberman
> > <daniel.imber...@gmail.com>
> > wrote:
> >
> > > Yeah do we have concrete examples of places where asyncio would be a
> > > non-starter? Are there enough of these examples to kill this idea? I
> > really
> > > don't like the idea of needing to maintain both sync and async.
> > >
> > > On Mon, Apr 8, 2024 at 1:39 PM Hussein Awala <huss...@awala.fr> wrote:
> > >
> > > > > we definitely need a way to opt-out for the ones who aren't
> > interested
> > > >
> > > > I disagree, what I propose is to infer the async connection from
> > > > the
> > sync
> > > > configuration using a translation method, with the possibility of
> > > providing
> > > > the async connection configuration explicitly. This will help to
> > > completely
> > > > migrate the REST API and web server to the async version without
> > > > duplicating the code.
> > > >
> > > > > We should have a seamless fallback to sync if async doesn't work
> > > > > for
> > > > whatever reasons
> > > >
> > > > For the async version of connections/variables, we will use the
> > > > sync
> > > method
> > > > wrapped by sync_to_async in the base class, in this case, the
> > > > async
> > > methods
> > > > will work in the custom secrets backends without any issues and
> > > > users
> > can
> > > > override the async methods for better implementation.
> > > >
> > > > > are we limiting the scope to lets say connections + variables
> > > > > and
> > > > expanding based on the results in the long term?
> > > >
> > > > This needs to be implemented step by step, the first step is to
> > > > add integration to the different providers and DB, then implement
> > > > an async version for the secrets backends, then migrate the REST
> > > > API and web
> > > server,
> > > > and later migrate our official executors, which will need also
> > > integrating
> > > > other tools like kubernetes-asyncio, and async integration for
> celery.
> > > >
> > > > > I think this needs to be an all or nothing thing
> > > >
> > > > Here are some of the available drivers
> > > > https://github.com/apache/airflow/pull/36504#issuecomment-18726537
> > > > 55,
> > I
> > > > have already tested one for each database, so we will have async
> > support
> > > > for all supported databases.
> > > >
> > > > > having to maintain sync and async versions of functions/features
> > > > > is a
> > > > non-starter in my mind;
> > > >
> > > > During the migration, we will have both sync and async endpoints
> > > > in the
> > > API
> > > > and the webserver (they will be migrated one by one and not at the
> > > > same time), but without any code duplication, in the worst case,
> > > > instead of duplicating a method, we can use sync_to_async, and
> > > > optimize it later after migrating all endpoints that use it.
> > > > But for Secrets Backends, we may have some duplicated code when it
> > > > is
> > not
> > > > possible to export it to a common method shared between sync and
> > > > async versions.
> > > >
> > > > > how can we keep one codenase bit cooe with sqlite?
> > > >
> > > > For my PoC, I used https://github.com/omnilib/aiosqlite and it
> > > > worked without any issues.
> > > >
> > > >
> > > >
> > > > On Mon, Apr 8, 2024 at 10:08 PM Daniel Standish
> > > > <daniel.stand...@astronomer.io.invalid> wrote:
> > > >
> > > > > If nothing else, write an ugly adapter using sync_to_async?
> > > > >
> > > > > On Mon, Apr 8, 2024 at 1:06 PM Daniel Standish <
> > > > > daniel.stand...@astronomer.io> wrote:
> > > > >
> > > > > > https://github.com/omnilib/aiosqlite maybe?
> > > > > >
> > > > > > On Mon, Apr 8, 2024 at 1:03 PM Scheffler Jens
> > > > > > (XC-AS/EAE-ADA-T) <jens.scheff...@de.bosch.com.invalid> wrote:
> > > > > >
> > > > > >> I understand the „all-in“ approach as we were dropping MSSQL…
> > > > > >> how
> > > can
> > > > we
> > > > > >> keep one codenase bit cooe with sqlite? I assume we must
> > > > > >> support
> > > this
> > > > at
> > > > > >> least for dev setups.
> > > > > >>
> > > > > >> Sent from Outlook for iOS<https://aka.ms/o0ukef>
> > > > > >> ________________________________
> > > > > >> From: Jarek Potiuk <ja...@potiuk.com>
> > > > > >> Sent: Monday, April 8, 2024 8:30:18 PM
> > > > > >> To: dev@airflow.apache.org <dev@airflow.apache.org>
> > > > > >> Subject: Re: [DISCUSS] Asynchronous SQLAlchemy
> > > > > >>
> > > > > >> Yep. If we can make both Postgres and MySQL work with Async -
> > > > > >> I am
> > > > also
> > > > > >> all
> > > > > >> for the "All" approach. If it means that we need to support
> > > > > >> only
> > > > certain
> > > > > >> drivers and certain versions of the DBs - so be it. As
> > > > > >> mentioned
> > in
> > > my
> > > > > >> original comments (long time ago when we had MSSQL support) -
> > > > > >> this
> > > was
> > > > > not
> > > > > >> really possible back then - but now, by getting rid of Mssql
> > > > > >> and
> > if
> > > we
> > > > > >> have
> > > > > >> the right drivers for mysql, it should be possible - I guess.
> > > > > >>
> > > > > >> On Mon, Apr 8, 2024 at 8:17 PM Daniel Standish
> > > > > >> <daniel.stand...@astronomer.io.invalid> wrote:
> > > > > >>
> > > > > >> > I wholeheartedly agree with Ash that it should be all or
> > nothing.
> > > > And
> > > > > >> > *all* sounds
> > > > > >> > better to me :)
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > On Mon, Apr 8, 2024 at 10:54 AM Ash Berlin-Taylor <
> > a...@apache.org
> > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > I’m all in favour of async SQLAlchemy. We’ve built two
> > products
> > > > > >> > > exclusively at @ Astronomer that use
> > > > > >> > > sqlalchemy+psycopg3+async
> > > and
> > > > > >> love
> > > > > >> > it.
> > > > > >> > > Async does take a bit of a learning curve, but SQLA has
> > > > > >> > > done
> > it
> > > > > nicely
> > > > > >> > and
> > > > > >> > > it works really well.
> > > > > >> > >
> > > > > >> > > I think this needs to be an all or nothing thing — having
> > > > > >> > > to
> > > > > maintain
> > > > > >> > sync
> > > > > >> > > and async versions of functions/features is a non-starter
> > > > > >> > > in
> > my
> > > > > mind;
> > > > > >> > it’d
> > > > > >> > > just be a worryingly large amount of duplicated work.
> > > > > >> > > Given
> > the
> > > > only
> > > > > >> DBs
> > > > > >> > we
> > > > > >> > > support now is postgres and mysql then I can’t think of
> > > > > >> > > any
> > > reason
> > > > > >> users
> > > > > >> > > should even care — they give it a DSN and that’s the end
> > > > > >> > > of
> > > their
> > > > > >> > > involvement.
> > > > > >> > >
> > > > > >> > > Amogh: I don’t understand what you mean by point 3 below.
> > > > > >> > >
> > > > > >> > > -ash
> > > > > >> > >
> > > > > >> > > > On 8 Apr 2024, at 05:31, Amogh Desai <
> > > amoghdesai....@gmail.com>
> > > > > >> wrote:
> > > > > >> > > >
> > > > > >> > > > I checked the content and the PR that you attached.
> > > > > >> > > >
> > > > > >> > > > The results do seem promising and I like the general
> > > > > >> > > > idea of
> > > > this
> > > > > >> > > approach.
> > > > > >> > > > But as Jarek
> > > > > >> > > > also mentioned on the PR:
> > > > > >> > > >
> > > > > >> > > > 1. Not everyone might be on the board to go all async
> > > > > >> > > > due to
> > > > > certain
> > > > > >> > > > limitations around
> > > > > >> > > > access to the drivers, or corporate limitations. So, we
> > > > definitely
> > > > > >> > need a
> > > > > >> > > > way to opt-out
> > > > > >> > > > for the ones who aren't interested.
> > > > > >> > > >
> > > > > >> > > > 2. We should have a seamless fallback to sync if async
> > doesn't
> > > > > work
> > > > > >> for
> > > > > >> > > > whatever reasons.
> > > > > >> > > >
> > > > > >> > > > 3. Are we going all in or are we limiting the scope to
> > > > > >> > > > lets
> > > say
> > > > > >> > > > connections + variables and expanding based on the
> > > > > >> > > > results in the long term?
> > > > > >> > > >
> > > > > >> > > > Looking forward to improvements async can bring in!
> > > > > >> > > >
> > > > > >> > > > Thanks & Regards,
> > > > > >> > > > Amogh Desai
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > > > On Sun, Apr 7, 2024 at 3:13 AM Hussein Awala <
> > > huss...@awala.fr>
> > > > > >> wrote:
> > > > > >> > > >
> > > > > >> > > >> The Metadata Database is the brain of Airflow, where
> > > > > >> > > >> all
> > > > > scheduling
> > > > > >> > > >> decisions, cross-communication, synchronization
> > > > > >> > > >> between
> > > > > components,
> > > > > >> > and
> > > > > >> > > >> management via the web server, are made using this
> > database.
> > > > > >> > > >>
> > > > > >> > > >> One option to optimize the DB queries is to merge many
> > into a
> > > > > >> single
> > > > > >> > > query
> > > > > >> > > >> to reduce latency and overall time, but this is not
> > > > > >> > > >> always
> > > > > possible
> > > > > >> > > because
> > > > > >> > > >> the queries are sometimes completely independent, and
> > > > > >> > > >> it is
> > > > > >> > > impossible/too
> > > > > >> > > >> complicated to merge them. But in this case, we have
> > another
> > > > > option
> > > > > >> > > which
> > > > > >> > > >> is running them concurrently since they are independent.
> > The
> > > > only
> > > > > >> way
> > > > > >> > > to do
> > > > > >> > > >> this currently is to use multithreading (the
> > > > > >> > > >> sync_to_async
> > > > > >> decorator
> > > > > >> > > >> creates a thread and waits for it using an asyncio
> > > coroutine),
> > > > > >> which
> > > > > >> > is
> > > > > >> > > >> already a good start, but by using the asyncio
> > > > > >> > > >> extension
> > for
> > > > > >> > sqlalchemy
> > > > > >> > > we
> > > > > >> > > >> will be able to create thousands of lightweight
> > > > > >> > > >> coroutines
> > > with
> > > > > the
> > > > > >> > same
> > > > > >> > > >> amount of resources as a few threads, which will also
> > > > > >> > > >> help
> > to
> > > > > >> reduce
> > > > > >> > > >> resources consumption.
> > > > > >> > > >>
> > > > > >> > > >> A few months ago I started a PoC to add support for
> > > > > >> > > >> this
> > > > > extension
> > > > > >> and
> > > > > >> > > >> implement an asynchronous version of connections and
> > > variables
> > > > to
> > > > > >> be
> > > > > >> > > able
> > > > > >> > > >> to get/set them from triggers without blocking the
> > > > > >> > > >> event
> > loop
> > > > and
> > > > > >> > > affecting
> > > > > >> > > >> the performance of the triggerer, and the result was
> > > > impressive (
> > > > > >> > > >>
> > > > > >>
> > > > >
> > > >
> > >
> > https://github.com/apache/airflow/pull/36504
> > > > > >> )<https://github.com/apache/airflow/pull/36504>.
> > > > > >> > > >>
> > > > > >> > > >> I see a good opportunity to improve the performance of
> > > > > >> > > >> our
> > > REST
> > > > > API
> > > > > >> > and
> > > > > >> > > web
> > > > > >> > > >> server (for example
> > > > > >>
> > > > >
> > > >
> > >
> > https://github.com/apache/airflow/issues/38776
> > > > > >> )<https://github.com/apache/airflow/issues/38776>,
> > > > > >> > > >> knowing that we can mix sync and async endpoints,
> > > > > >> > > >> which
> > will
> > > > help
> > > > > >> for
> > > > > >> > a
> > > > > >> > > >> smooth migration.
> > > > > >> > > >>
> > > > > >> > > >> I also think that it will be possible (and very
> > > > > >> > > >> useful) to
> > > > > migrate
> > > > > >> > some
> > > > > >> > > of
> > > > > >> > > >> our executors to a full asynchronous version to
> > > > > >> > > >> improve
> > their
> > > > > >> > > performance
> > > > > >> > > >> (kubernetes and celery)
> > > > > >> > > >>
> > > > > >> > > >> I use the sqlalchemy asyncio extension in many
> > > > > >> > > >> personal and
> > > > > company
> > > > > >> > > >> projects, and I'm very happy with it, but I would like
> > > > > >> > > >> to
> > > hear
> > > > > from
> > > > > >> > > others
> > > > > >> > > >> if they have any positive or negative feedback about it.
> > > > > >> > > >>
> > > > > >> > > >> I will create a new AIP for integrating the asyncio
> > extension
> > > > of
> > > > > >> > > >> sqlaclhemy, and other following AIPs to
> > > > > >> > > >> migrate/support
> > each
> > > > > >> component
> > > > > >> > > once
> > > > > >> > > >> the first one is implemented, but first, I prefer to
> > > > > >> > > >> check
> > > what
> > > > > the
> > > > > >> > > >> community and other committers think about this
> > integration.
> > > > > >> > > >>
> > > > > >> > >
> > > > > >> > >
> > > > > >> > >
> > > > > ----------------------------------------------------------------
> > > > > -----
> > > > > >> > > To unsubscribe, e-mail:
> > > > > >> > > dev-unsubscr...@airflow.apache.org
> > > > > >> > > For additional commands, e-mail:
> > > > > >> > > dev-h...@airflow.apache.org
> > > > > >> > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> ________________________________
>  Strike Technologies, LLC (“Strike”) is part of the GTS family of
> companies. Strike is a technology solutions provider, and is not a broker
> or dealer and does not transact any securities related business directly
> whatsoever. This communication is the property of Strike and its
> affiliates, and does not constitute an offer to sell or the solicitation of
> an offer to buy any security in any jurisdiction. It is intended only for
> the person to whom it is addressed and may contain information that is
> privileged, confidential, or otherwise protected from disclosure.
> Distribution or copying of this communication, or the information contained
> herein, by anyone other than the intended recipient is prohibited. If you
> have received this communication in error, please immediately notify Strike
> at i...@striketechnologies.com, and delete and destroy any copies hereof.
> ________________________________
>
> CONFIDENTIALITY / PRIVILEGE NOTICE: This transmission and any attachments
> are intended solely for the addressee. This transmission is covered by the
> Electronic Communications Privacy Act, 18 U.S.C ''2510-2521. The
> information contained in this transmission is confidential in nature and
> protected from further use or disclosure under U.S. Pub. L. 106-102, 113
> U.S. Stat. 1338 (1999), and may be subject to attorney-client or other
> legal privilege. Your use or disclosure of this information for any purpose
> other than that intended by its transmittal is strictly prohibited, and may
> subject you to fines and/or penalties under federal and state law. If you
> are not the intended recipient of this transmission, please DESTROY ALL
> COPIES RECEIVED and confirm destruction to the sender via return
> transmittal.
>
________________________________
Strike Technologies, LLC (“Strike”) is part of the GTS family of companies. 
Strike is a technology solutions provider, and is not a broker or dealer and 
does not transact any securities related business directly whatsoever. This 
communication is the property of Strike and its affiliates, and does not 
constitute an offer to sell or the solicitation of an offer to buy any security 
in any jurisdiction. It is intended only for the person to whom it is addressed 
and may contain information that is privileged, confidential, or otherwise 
protected from disclosure. Distribution or copying of this communication, or 
the information contained herein, by anyone other than the intended recipient 
is prohibited. If you have received this communication in error, please 
immediately notify Strike at i...@striketechnologies.com, and delete and 
destroy any copies hereof.
________________________________
CONFIDENTIALITY / PRIVILEGE NOTICE: This transmission and any attachments are 
intended solely for the addressee. This transmission is covered by the 
Electronic Communications Privacy Act, 18 U.S.C ''2510-2521. The information 
contained in this transmission is confidential in nature and protected from 
further use or disclosure under U.S. Pub. L. 106-102, 113 U.S. Stat. 1338 
(1999), and may be subject to attorney-client or other legal privilege. Your 
use or disclosure of this information for any purpose other than that intended 
by its transmittal is strictly prohibited, and may subject you to fines and/or 
penalties under federal and state law. If you are not the intended recipient of 
this transmission, please DESTROY ALL COPIES RECEIVED and confirm destruction 
to the sender via return transmittal.

Reply via email to