Does anybody have experience with writing this kind of state change code in stored procedures?
On Sat, Jun 4, 2016 at 6:39 AM, Bolke de Bruin <[email protected]> wrote: > > > Op 4 jun. 2016, om 03:16 heeft Maxime Beauchemin < > [email protected]> het volgende geschreven: > > > > Caching is a last resort solution and probably not a good thing here. It > > would introduce lag and confusion. > > > > Agreed. > > > You seem to say that some things evaluated twice within a scheduler > cycle? > > What would that be? > > In the old scheduler (before AIRFLOW-128 the refactor of process_dag) it > was actually 3 times, although the impact of it was probably around 2 times > due to the query cache: > > 1. get_active_runs was calling ti.are_dependencies_met twice. First as > part of the overall deadlock check, and because the results were not > stored, again for the individual dag_run deadlock check. > 2. process_dag is calling ti.is_runnable() which in its function also call > ti.are_dependencies_met() > > In the new version DagRun.update_state() gets called which is more or less > a copy of get_active_runs except for the fact that I store the deadlock > checks. process_dag still calls ti.is_runnable() at the moment. > > > > > Another option is to reduce the number of database interaction and make > > sure indexes are in place and balanced (you may want to rebuild your > > indexes and make sure they are getting used). If the DB is the point of > > contention, we can assume that getting a fast DB, or rationalizing > overall > > DB interactions (less frequent heartbeats?) would also help. > > I actually verified if this would make a difference as part of > AIRFLOW-128. It doesn’t really. Aggregation functions always require a > table scan and ti.are_dependencies_met() uses 5 of them (though they > probably get optimized to one) > > > > > Another option is *distributed scheduling* where as part of the worker > > after finishing a task instance it would evaluate [only] the directly > > downstream tasks for execution. In theory the scheduler could just seed > the > > DAG and the downstream tasks be triggered by their parents, immediately > > after, almost like *callbacks*. Then it matters a lot less how long > > scheduling cycles actually take… > > > Absolutely. I am tinkering with having a “notify_downstream” function in > the task instances that call a “update” function on the downstream task. It > is conceptually quite a small step to check if the trigger rules have been > met and then notify the scheduler that the task can be scheduled. If you > think about it further you could probably remove the scheduler logic for > determining which task gets scheduled by putting a the TaskInstance on a MQ > and let the workers decide which one they can pick up. The scheduler would > then just do some garbage collection. I think this could be the next step, > not one I am prepared to undertake right now. I think a large speed up can > be gotten from the initial suggestion. I am working on a poc that will > allow to see the difference in speed. > > > > > Max > > > > On Fri, Jun 3, 2016 at 2:26 PM, Bolke de Bruin <[email protected]> > wrote: > > > >> Hi, > >> > >> I am looking at speeding up the scheduler. Currently loop times increase > >> with the amount of tasks in a dag. This is due to > >> TaskInstance.are_depedencies_met executing several aggregation > functions on > >> the database. These calls are expensive: between 0.05-0.15s per task and > >> for every scheduler loop this gets called twice. This call is where the > >> scheduler spends around 90% of its time when evaluating dags and is the > >> reason for people that have a large amount of tasks per dag to so quite > >> large loop times (north of 600s). > >> > >> I see 2 options to optimize the loop without going to a multiprocessing > >> approach which will just put the problem down the line (ie. the db or > when > >> you don’t have enough cores anymore). > >> > >> 1. Cache the call to TI.are_dependencies_met by either caching in a > >> something like memcache or removing the need for the double call > >> (update_state and process_dag both make the call to > >> TI.are_dependencies_met). This would more or less cut the time in half. > >> > >> 2. Notify the downstream tasks of a state change of a upstream task. > This > >> would remove the need for the aggregation as the task would just > ‘know’. It > >> is a bit harder to implement correctly as you need to make sure you keep > >> being in a consistent state. Obviously you could still run a integrity > >> check once in a while. This option would make the aggregation event > based > >> and significantly reduce the time spend here to around 1-5% of the > current > >> scheduler. There is a slight overhead added at a state change of the > >> TaskInstance (managed by the TaskInstance itself). > >> > >> What do you think? My preferred option is #2. Am i missing any other > >> options? Are scheduler loop times a concern at all? > >> > >> Thanks > >> Bolke > >> > >> > >> > > -- Lance Norskog [email protected] Redwood City, CA
