I've done something similar to this as triggers in MySQL. It's hard to do this in a generic way, though. Every SQL DB has their own way of expressing procedures (or none at all).
On Sat, Jun 4, 2016 at 3:32 PM, Lance Norskog <[email protected]> wrote: > Does anybody have experience with writing this kind of state change code in > stored procedures? > > On Sat, Jun 4, 2016 at 6:39 AM, Bolke de Bruin <[email protected]> wrote: > > > > > > Op 4 jun. 2016, om 03:16 heeft Maxime Beauchemin < > > [email protected]> het volgende geschreven: > > > > > > Caching is a last resort solution and probably not a good thing here. > It > > > would introduce lag and confusion. > > > > > > > Agreed. > > > > > You seem to say that some things evaluated twice within a scheduler > > cycle? > > > What would that be? > > > > In the old scheduler (before AIRFLOW-128 the refactor of process_dag) it > > was actually 3 times, although the impact of it was probably around 2 > times > > due to the query cache: > > > > 1. get_active_runs was calling ti.are_dependencies_met twice. First as > > part of the overall deadlock check, and because the results were not > > stored, again for the individual dag_run deadlock check. > > 2. process_dag is calling ti.is_runnable() which in its function also > call > > ti.are_dependencies_met() > > > > In the new version DagRun.update_state() gets called which is more or > less > > a copy of get_active_runs except for the fact that I store the deadlock > > checks. process_dag still calls ti.is_runnable() at the moment. > > > > > > > > Another option is to reduce the number of database interaction and make > > > sure indexes are in place and balanced (you may want to rebuild your > > > indexes and make sure they are getting used). If the DB is the point of > > > contention, we can assume that getting a fast DB, or rationalizing > > overall > > > DB interactions (less frequent heartbeats?) would also help. > > > > I actually verified if this would make a difference as part of > > AIRFLOW-128. It doesn’t really. Aggregation functions always require a > > table scan and ti.are_dependencies_met() uses 5 of them (though they > > probably get optimized to one) > > > > > > > > Another option is *distributed scheduling* where as part of the worker > > > after finishing a task instance it would evaluate [only] the directly > > > downstream tasks for execution. In theory the scheduler could just seed > > the > > > DAG and the downstream tasks be triggered by their parents, immediately > > > after, almost like *callbacks*. Then it matters a lot less how long > > > scheduling cycles actually take… > > > > > > Absolutely. I am tinkering with having a “notify_downstream” function in > > the task instances that call a “update” function on the downstream task. > It > > is conceptually quite a small step to check if the trigger rules have > been > > met and then notify the scheduler that the task can be scheduled. If you > > think about it further you could probably remove the scheduler logic for > > determining which task gets scheduled by putting a the TaskInstance on a > MQ > > and let the workers decide which one they can pick up. The scheduler > would > > then just do some garbage collection. I think this could be the next > step, > > not one I am prepared to undertake right now. I think a large speed up > can > > be gotten from the initial suggestion. I am working on a poc that will > > allow to see the difference in speed. > > > > > > > > Max > > > > > > On Fri, Jun 3, 2016 at 2:26 PM, Bolke de Bruin <[email protected]> > > wrote: > > > > > >> Hi, > > >> > > >> I am looking at speeding up the scheduler. Currently loop times > increase > > >> with the amount of tasks in a dag. This is due to > > >> TaskInstance.are_depedencies_met executing several aggregation > > functions on > > >> the database. These calls are expensive: between 0.05-0.15s per task > and > > >> for every scheduler loop this gets called twice. This call is where > the > > >> scheduler spends around 90% of its time when evaluating dags and is > the > > >> reason for people that have a large amount of tasks per dag to so > quite > > >> large loop times (north of 600s). > > >> > > >> I see 2 options to optimize the loop without going to a > multiprocessing > > >> approach which will just put the problem down the line (ie. the db or > > when > > >> you don’t have enough cores anymore). > > >> > > >> 1. Cache the call to TI.are_dependencies_met by either caching in a > > >> something like memcache or removing the need for the double call > > >> (update_state and process_dag both make the call to > > >> TI.are_dependencies_met). This would more or less cut the time in > half. > > >> > > >> 2. Notify the downstream tasks of a state change of a upstream task. > > This > > >> would remove the need for the aggregation as the task would just > > ‘know’. It > > >> is a bit harder to implement correctly as you need to make sure you > keep > > >> being in a consistent state. Obviously you could still run a integrity > > >> check once in a while. This option would make the aggregation event > > based > > >> and significantly reduce the time spend here to around 1-5% of the > > current > > >> scheduler. There is a slight overhead added at a state change of the > > >> TaskInstance (managed by the TaskInstance itself). > > >> > > >> What do you think? My preferred option is #2. Am i missing any other > > >> options? Are scheduler loop times a concern at all? > > >> > > >> Thanks > > >> Bolke > > >> > > >> > > >> > > > > > > > -- > Lance Norskog > [email protected] > Redwood City, CA >
