But you could run them in a thread or subprocess.

Another option would be to just take all of the timed events and make them
all asyncio and then run them all via asyncio in one continually running
thread.  That would be a bite size step towards AIP-70.  Though, it might
be a large bite :)

On Fri, May 3, 2024 at 6:29 AM Hussein Awala <huss...@awala.fr> wrote:

> If we don't have many Asyncio tasks running in the event loop, there will
> not be any benefit from migrating to asynchronous, IMHO it will be anyway
> rewritten to be asynchronous as a part of AIP-70
> <
> https://cwiki.apache.org/confluence/display/AIRFLOW/%5BWIP%5D+AIP-70+Migrating+to+asynchronous+programming
> >
> (WIP)
> where we will need to rewrite the scheduler if the AIP is accepted.
>
> On Fri, May 3, 2024 at 2:49 PM Ryan Hatter
> <ryan.hat...@astronomer.io.invalid> wrote:
>
> > This might be a dumb question as I don't have experience with asyncio,
> but
> > should the EventScheduler
> > <
> >
> https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job_runner.py#L930
> > >
> > in
> > the Airflow scheduler be rewritten to be asynchronous?
> >
> > The so called "timed events" (e.g. zombie reaping, handling tasks stuck
> in
> > queued, etc <import asyncio import threading import time class
> > AsyncEventScheduler: def __init__(self): self.tasks = [] async def
> > call_regular_interval(self, interval, action, *args, **kwargs):
> > """Schedules action to be called every `interval` seconds
> > asynchronously.""" while True: await asyncio.sleep(interval) await
> > action(*args, **kwargs) def schedule_task(self, interval, action, *args,
> > **kwargs): """Add tasks that run periodically in an asynchronous
> manner."""
> > task = asyncio.create_task( self.call_regular_interval(interval, action,
> > *args, **kwargs) ) self.tasks.append(task) async def detect_zombies():
> > print("🧟") async def detect_stuck_queued_tasks(): print("Oh no! A task
> is
> > stuck in queued!") def scheduler_loop(): while True: print("Starting
> > scheduling loop...") time.sleep(10) def _do_scheduling(): thread =
> > threading.Thread(target=scheduler_loop) thread.start() async def main():
> > scheduler = AsyncEventScheduler() scheduler.schedule_task(3,
> > detect_zombies) scheduler.schedule_task(5, detect_stuck_queued_tasks)
> > _do_scheduling() while True: print("EventScheduler running") await
> > asyncio.sleep(1) asyncio.run(main())>) scheduled by this EventScheduler
> are
> > blocking and run queries against the DB that can occasionally be
> expensive
> > and cause substantial delays in the scheduler, which can result in
> repeated
> > scheduler restarts.
> >
> > Below is a trivialized example of what this might look like -- curious to
> > hear your thoughts!
> >
> > import asyncio
> > import threading
> > import time
> >
> > class AsyncEventScheduler:
> > def __init__(self):
> > self.tasks = []
> >
> > async def call_regular_interval(self, interval, action, *args, **kwargs):
> > """Schedules action to be called every `interval` seconds
> > asynchronously."""
> > while True:
> > await asyncio.sleep(interval)
> > await action(*args, **kwargs)
> >
> > def schedule_task(self, interval, action, *args, **kwargs):
> > """Add tasks that run periodically in an asynchronous manner."""
> > task = asyncio.create_task(
> > self.call_regular_interval(interval, action, *args, **kwargs)
> > )
> > self.tasks.append(task)
> >
> > async def detect_zombies():
> > print("🧟")
> >
> > async def detect_stuck_queued_tasks():
> > print("Oh no! A task is stuck in queued!")
> >
> > def scheduler_loop():
> > while True:
> > print("Starting scheduling loop...")
> > time.sleep(10)
> >
> > def _do_scheduling():
> > thread = threading.Thread(target=scheduler_loop)
> > thread.start()
> >
> > async def main():
> > scheduler = AsyncEventScheduler()
> > scheduler.schedule_task(3, detect_zombies)
> > scheduler.schedule_task(5, detect_stuck_queued_tasks)
> >
> > _do_scheduling()
> >
> > while True:
> > print("EventScheduler running")
> > await asyncio.sleep(1)
> >
> > asyncio.run(main())
> >
>

Reply via email to