I've done something similar to this as triggers in MySQL. It's hard to do
this in a generic way, though. Every SQL DB has their own way of expressing
procedures (or none at all).

On Sat, Jun 4, 2016 at 3:32 PM, Lance Norskog <[email protected]>
wrote:

> Does anybody have experience with writing this kind of state change code in
> stored procedures?
>
> On Sat, Jun 4, 2016 at 6:39 AM, Bolke de Bruin <[email protected]> wrote:
>
> >
> > > Op 4 jun. 2016, om 03:16 heeft Maxime Beauchemin <
> > [email protected]> het volgende geschreven:
> > >
> > > Caching is a last resort solution and probably not a good thing here.
> It
> > > would introduce lag and confusion.
> > >
> >
> > Agreed.
> >
> > > You seem to say that some things evaluated twice within a scheduler
> > cycle?
> > > What would that be?
> >
> > In the old scheduler (before AIRFLOW-128 the refactor of process_dag) it
> > was actually 3 times, although the impact of it was probably around 2
> times
> > due to the query cache:
> >
> > 1. get_active_runs was calling ti.are_dependencies_met twice. First as
> > part of the overall deadlock check, and because the results were not
> > stored, again for the individual dag_run deadlock check.
> > 2. process_dag is calling ti.is_runnable() which in its function also
> call
> > ti.are_dependencies_met()
> >
> > In the new version DagRun.update_state() gets called which is more or
> less
> > a copy of get_active_runs except for the fact that I store the deadlock
> > checks. process_dag still calls ti.is_runnable() at the moment.
> >
> > >
> > > Another option is to reduce the number of database interaction and make
> > > sure indexes are in place and balanced (you may want to rebuild your
> > > indexes and make sure they are getting used). If the DB is the point of
> > > contention, we can assume that getting a fast DB, or rationalizing
> > overall
> > > DB interactions (less frequent heartbeats?) would also help.
> >
> > I actually verified if this would make a difference as part of
> > AIRFLOW-128. It doesn’t really. Aggregation functions always require a
> > table scan and ti.are_dependencies_met() uses 5 of them (though they
> > probably get optimized to one)
> >
> > >
> > > Another option is *distributed scheduling* where as part of the worker
> > > after finishing a task instance it would evaluate [only] the directly
> > > downstream tasks for execution. In theory the scheduler could just seed
> > the
> > > DAG and the downstream tasks be triggered by their parents, immediately
> > > after, almost like *callbacks*. Then it matters a lot less how long
> > > scheduling cycles actually take…
> >
> >
> > Absolutely. I am tinkering with having a “notify_downstream” function in
> > the task instances that call a “update” function on the downstream task.
> It
> > is conceptually quite a small step to check if the trigger rules have
> been
> > met and then notify the scheduler that the task can be scheduled. If you
> > think about it further you could probably remove the scheduler logic for
> > determining which task gets scheduled by putting a the TaskInstance on a
> MQ
> > and let the workers decide which one they can pick up. The scheduler
> would
> > then just do some garbage collection. I think this could be the next
> step,
> > not one I am prepared to undertake right now. I think a large speed up
> can
> > be gotten from the initial suggestion. I am working on a poc that will
> > allow to see the difference in speed.
> >
> > >
> > > Max
> > >
> > > On Fri, Jun 3, 2016 at 2:26 PM, Bolke de Bruin <[email protected]>
> > wrote:
> > >
> > >> Hi,
> > >>
> > >> I am looking at speeding up the scheduler. Currently loop times
> increase
> > >> with the amount of tasks in a dag. This is due to
> > >> TaskInstance.are_depedencies_met executing several aggregation
> > functions on
> > >> the database. These calls are expensive: between 0.05-0.15s per task
> and
> > >> for every scheduler loop this gets called twice. This call is where
> the
> > >> scheduler spends around 90% of its time when evaluating dags and is
> the
> > >> reason for people that have a large amount of tasks per dag to so
> quite
> > >> large loop times (north of 600s).
> > >>
> > >> I see 2 options to optimize the loop without going to a
> multiprocessing
> > >> approach which will just put the problem down the line (ie. the db or
> > when
> > >> you don’t have enough cores anymore).
> > >>
> > >> 1. Cache the call to TI.are_dependencies_met by either caching in a
> > >> something like memcache or removing the need for the double call
> > >> (update_state and process_dag both make the call to
> > >> TI.are_dependencies_met). This would more or less cut the time in
> half.
> > >>
> > >> 2. Notify the downstream tasks of a state change of a upstream task.
> > This
> > >> would remove the need for the aggregation as the task would just
> > ‘know’. It
> > >> is a bit harder to implement correctly as you need to make sure you
> keep
> > >> being in a consistent state. Obviously you could still run a integrity
> > >> check once in a while. This option would make the aggregation event
> > based
> > >> and significantly reduce the time spend here to around 1-5% of the
> > current
> > >> scheduler. There is a slight overhead added at a state change of the
> > >> TaskInstance (managed by the TaskInstance itself).
> > >>
> > >> What do you think? My preferred option is #2. Am i missing any other
> > >> options? Are scheduler loop times a concern at all?
> > >>
> > >> Thanks
> > >> Bolke
> > >>
> > >>
> > >>
> >
> >
>
>
> --
> Lance Norskog
> [email protected]
> Redwood City, CA
>

Reply via email to