I'd rather the scheduler delegate that to one of the minions (subprocess) if possible. We should keep everything we can off the main thread.
BTW I've been speaking about renaming the scheduler to "supervisor" for a while now. While renaming may be a bit tricky (updating all references in the code), we should think of the scheduler as more of a supervisor as it takes on all sorts of supervision-related tasks. Tangent: we need to start thinking about allowing for a distributed scheduler too, and I'm thinking we need to be careful around the tasks that shouldn't be parallelized (this may or may not be one of them). We'll need to do very basic leader election and taking/releasing locks while running these tasks. I'm thinking we can just set flags in the database to do that. Max On Wed, Aug 22, 2018 at 12:19 PM Taylor Edmiston <[email protected]> wrote: > I'm not super familiar with this part of the scheduler. What exactly are > the implications of doing this mid-loop vs at scheduler termination? > Is there a use case where DAGs hit this besides having been deleted? > > The deactivate_stale_dags call doesn't appear to be super expensive or > anything like that. > > This seems like a reasonable idea to me. > > *Taylor Edmiston* > Blog <https://blog.tedmiston.com/> | CV > <https://stackoverflow.com/cv/taylor> | LinkedIn > <https://www.linkedin.com/in/tedmiston/> | AngelList > <https://angel.co/taylor> | Stack Overflow > <https://stackoverflow.com/users/149428/taylor-edmiston> > > > > On Wed, Aug 22, 2018 at 2:32 PM Dan Davydov <[email protected]> > wrote: > > > I see some PRs creating endpoints to delete DAGs and other things related > > to manually deleting DAGs from the DB, but is there a good reason why we > > can't just move the deactivating DAG logic into the main scheduler loop? > > > > The scheduler already has some code like this, but it only runs when the > > Scheduler terminates: > > if all_files_processed: > > self.log.info( > > "Deactivating DAGs that haven't been touched since %s", > > execute_start_time.isoformat() > > ) > > models.DAG.deactivate_stale_dags(execute_start_time) > > >
