Hi,

I am looking at speeding up the scheduler. Currently loop times increase with 
the amount of tasks in a dag. This is due to TaskInstance.are_depedencies_met 
executing several aggregation functions on the database. These calls are 
expensive: between 0.05-0.15s per task and for every scheduler loop this gets 
called twice. This call is where the scheduler spends around 90% of its time 
when evaluating dags and is the reason for people that have a large amount of 
tasks per dag to so quite large loop times (north of 600s).

I see 2 options to optimize the loop without going to a multiprocessing 
approach which will just put the problem down the line (ie. the db or when you 
don’t have enough cores anymore).

1. Cache the call to TI.are_dependencies_met by either caching in a something 
like memcache or removing the need for the double call (update_state and 
process_dag both make the call to TI.are_dependencies_met). This would more or 
less cut the time in half.

2. Notify the downstream tasks of a state change of a upstream task. This would 
remove the need for the aggregation as the task would just ‘know’. It is a bit 
harder to implement correctly as you need to make sure you keep being in a 
consistent state. Obviously you could still run a integrity check once in a 
while. This option would make the aggregation event based and significantly 
reduce the time spend here to around 1-5% of the current scheduler. There is a 
slight overhead added at a state change of the TaskInstance (managed by the 
TaskInstance itself).

What do you think? My preferred option is #2. Am i missing any other options? 
Are scheduler loop times a concern at all?

Thanks
Bolke


Reply via email to