> Op 4 jun. 2016, om 03:16 heeft Maxime Beauchemin <[email protected]> > het volgende geschreven: > > Caching is a last resort solution and probably not a good thing here. It > would introduce lag and confusion. >
Agreed. > You seem to say that some things evaluated twice within a scheduler cycle? > What would that be? In the old scheduler (before AIRFLOW-128 the refactor of process_dag) it was actually 3 times, although the impact of it was probably around 2 times due to the query cache: 1. get_active_runs was calling ti.are_dependencies_met twice. First as part of the overall deadlock check, and because the results were not stored, again for the individual dag_run deadlock check. 2. process_dag is calling ti.is_runnable() which in its function also call ti.are_dependencies_met() In the new version DagRun.update_state() gets called which is more or less a copy of get_active_runs except for the fact that I store the deadlock checks. process_dag still calls ti.is_runnable() at the moment. > > Another option is to reduce the number of database interaction and make > sure indexes are in place and balanced (you may want to rebuild your > indexes and make sure they are getting used). If the DB is the point of > contention, we can assume that getting a fast DB, or rationalizing overall > DB interactions (less frequent heartbeats?) would also help. I actually verified if this would make a difference as part of AIRFLOW-128. It doesn’t really. Aggregation functions always require a table scan and ti.are_dependencies_met() uses 5 of them (though they probably get optimized to one) > > Another option is *distributed scheduling* where as part of the worker > after finishing a task instance it would evaluate [only] the directly > downstream tasks for execution. In theory the scheduler could just seed the > DAG and the downstream tasks be triggered by their parents, immediately > after, almost like *callbacks*. Then it matters a lot less how long > scheduling cycles actually take… Absolutely. I am tinkering with having a “notify_downstream” function in the task instances that call a “update” function on the downstream task. It is conceptually quite a small step to check if the trigger rules have been met and then notify the scheduler that the task can be scheduled. If you think about it further you could probably remove the scheduler logic for determining which task gets scheduled by putting a the TaskInstance on a MQ and let the workers decide which one they can pick up. The scheduler would then just do some garbage collection. I think this could be the next step, not one I am prepared to undertake right now. I think a large speed up can be gotten from the initial suggestion. I am working on a poc that will allow to see the difference in speed. > > Max > > On Fri, Jun 3, 2016 at 2:26 PM, Bolke de Bruin <[email protected]> wrote: > >> Hi, >> >> I am looking at speeding up the scheduler. Currently loop times increase >> with the amount of tasks in a dag. This is due to >> TaskInstance.are_depedencies_met executing several aggregation functions on >> the database. These calls are expensive: between 0.05-0.15s per task and >> for every scheduler loop this gets called twice. This call is where the >> scheduler spends around 90% of its time when evaluating dags and is the >> reason for people that have a large amount of tasks per dag to so quite >> large loop times (north of 600s). >> >> I see 2 options to optimize the loop without going to a multiprocessing >> approach which will just put the problem down the line (ie. the db or when >> you don’t have enough cores anymore). >> >> 1. Cache the call to TI.are_dependencies_met by either caching in a >> something like memcache or removing the need for the double call >> (update_state and process_dag both make the call to >> TI.are_dependencies_met). This would more or less cut the time in half. >> >> 2. Notify the downstream tasks of a state change of a upstream task. This >> would remove the need for the aggregation as the task would just ‘know’. It >> is a bit harder to implement correctly as you need to make sure you keep >> being in a consistent state. Obviously you could still run a integrity >> check once in a while. This option would make the aggregation event based >> and significantly reduce the time spend here to around 1-5% of the current >> scheduler. There is a slight overhead added at a state change of the >> TaskInstance (managed by the TaskInstance itself). >> >> What do you think? My preferred option is #2. Am i missing any other >> options? Are scheduler loop times a concern at all? >> >> Thanks >> Bolke >> >> >>
