Does anybody have experience with writing this kind of state change code in
stored procedures?

On Sat, Jun 4, 2016 at 6:39 AM, Bolke de Bruin <[email protected]> wrote:

>
> > Op 4 jun. 2016, om 03:16 heeft Maxime Beauchemin <
> [email protected]> het volgende geschreven:
> >
> > Caching is a last resort solution and probably not a good thing here. It
> > would introduce lag and confusion.
> >
>
> Agreed.
>
> > You seem to say that some things evaluated twice within a scheduler
> cycle?
> > What would that be?
>
> In the old scheduler (before AIRFLOW-128 the refactor of process_dag) it
> was actually 3 times, although the impact of it was probably around 2 times
> due to the query cache:
>
> 1. get_active_runs was calling ti.are_dependencies_met twice. First as
> part of the overall deadlock check, and because the results were not
> stored, again for the individual dag_run deadlock check.
> 2. process_dag is calling ti.is_runnable() which in its function also call
> ti.are_dependencies_met()
>
> In the new version DagRun.update_state() gets called which is more or less
> a copy of get_active_runs except for the fact that I store the deadlock
> checks. process_dag still calls ti.is_runnable() at the moment.
>
> >
> > Another option is to reduce the number of database interaction and make
> > sure indexes are in place and balanced (you may want to rebuild your
> > indexes and make sure they are getting used). If the DB is the point of
> > contention, we can assume that getting a fast DB, or rationalizing
> overall
> > DB interactions (less frequent heartbeats?) would also help.
>
> I actually verified if this would make a difference as part of
> AIRFLOW-128. It doesn’t really. Aggregation functions always require a
> table scan and ti.are_dependencies_met() uses 5 of them (though they
> probably get optimized to one)
>
> >
> > Another option is *distributed scheduling* where as part of the worker
> > after finishing a task instance it would evaluate [only] the directly
> > downstream tasks for execution. In theory the scheduler could just seed
> the
> > DAG and the downstream tasks be triggered by their parents, immediately
> > after, almost like *callbacks*. Then it matters a lot less how long
> > scheduling cycles actually take…
>
>
> Absolutely. I am tinkering with having a “notify_downstream” function in
> the task instances that call a “update” function on the downstream task. It
> is conceptually quite a small step to check if the trigger rules have been
> met and then notify the scheduler that the task can be scheduled. If you
> think about it further you could probably remove the scheduler logic for
> determining which task gets scheduled by putting a the TaskInstance on a MQ
> and let the workers decide which one they can pick up. The scheduler would
> then just do some garbage collection. I think this could be the next step,
> not one I am prepared to undertake right now. I think a large speed up can
> be gotten from the initial suggestion. I am working on a poc that will
> allow to see the difference in speed.
>
> >
> > Max
> >
> > On Fri, Jun 3, 2016 at 2:26 PM, Bolke de Bruin <[email protected]>
> wrote:
> >
> >> Hi,
> >>
> >> I am looking at speeding up the scheduler. Currently loop times increase
> >> with the amount of tasks in a dag. This is due to
> >> TaskInstance.are_depedencies_met executing several aggregation
> functions on
> >> the database. These calls are expensive: between 0.05-0.15s per task and
> >> for every scheduler loop this gets called twice. This call is where the
> >> scheduler spends around 90% of its time when evaluating dags and is the
> >> reason for people that have a large amount of tasks per dag to so quite
> >> large loop times (north of 600s).
> >>
> >> I see 2 options to optimize the loop without going to a multiprocessing
> >> approach which will just put the problem down the line (ie. the db or
> when
> >> you don’t have enough cores anymore).
> >>
> >> 1. Cache the call to TI.are_dependencies_met by either caching in a
> >> something like memcache or removing the need for the double call
> >> (update_state and process_dag both make the call to
> >> TI.are_dependencies_met). This would more or less cut the time in half.
> >>
> >> 2. Notify the downstream tasks of a state change of a upstream task.
> This
> >> would remove the need for the aggregation as the task would just
> ‘know’. It
> >> is a bit harder to implement correctly as you need to make sure you keep
> >> being in a consistent state. Obviously you could still run a integrity
> >> check once in a while. This option would make the aggregation event
> based
> >> and significantly reduce the time spend here to around 1-5% of the
> current
> >> scheduler. There is a slight overhead added at a state change of the
> >> TaskInstance (managed by the TaskInstance itself).
> >>
> >> What do you think? My preferred option is #2. Am i missing any other
> >> options? Are scheduler loop times a concern at all?
> >>
> >> Thanks
> >> Bolke
> >>
> >>
> >>
>
>


-- 
Lance Norskog
[email protected]
Redwood City, CA

Reply via email to