Teaser:

2016-06-05 18:38:33,371 INFO - Loop took: 0.022339 seconds
2016-06-05 18:38:38,034 INFO - Loop took: 0.019446 seconds
2016-06-05 18:38:43,024 INFO - Loop took: 0.013158 seconds
2016-06-05 18:38:48,166 INFO - Loop took: 0.152832 seconds
2016-06-05 18:38:53,173 INFO - Loop took: 0.163242 seconds
2016-06-05 18:38:58,244 INFO - Loop took: 0.232048 seconds
2016-06-05 18:39:03,520 INFO - Loop took: 0.51223 seconds
2016-06-05 18:39:08,887 INFO - Loop took: 0.878129 seconds
2016-06-05 18:39:14,312 INFO - Loop took: 1.303849 seconds
2016-06-05 18:39:22,404 INFO - Loop took: 4.395173 seconds
2016-06-05 18:39:23,816 INFO - Loop took: 0.811369 seconds
2016-06-05 18:39:30,144 INFO - Loop took: 2.135294 seconds
2016-06-05 18:39:36,488 INFO - Loop took: 3.481462 seconds
2016-06-05 18:39:38,704 INFO - Loop took: 0.698312 seconds
2016-06-05 18:39:46,486 INFO - Loop took: 3.474636 seconds
2016-06-05 18:39:48,816 INFO - Loop took: 0.808555 seconds
2016-06-05 18:39:56,852 INFO - Loop took: 3.840377 seconds
2016-06-05 18:39:58,936 INFO - Loop took: 0.924968 seconds
2016-06-05 18:40:07,392 INFO - Loop took: 4.381813 seconds
2016-06-05 18:40:15,520 INFO - Loop took: 7.503047 seconds
2016-06-05 18:40:15,912 INFO - Loop took: 0.333375 seconds
2016-06-05 18:40:21,580 INFO - Loop took: 0.563334 seconds

This is from a fully event driven scheduler that is working on the same dag as 
mentioned in AIRFLOW-128. In the non event driven scheduler this was taking 
30-60s around peak times. In my approach I do let Tis decide if they are ready 
to be scheduled (new state “READY”). They set this state when they receive an 
upstream event that turns evaluate_trigger_rule to true. Obviously, this 
requires some kickstarter TIs which I obtain from the dag by looking for the 
TIs that don’t have an upstream.

Very very early hacks are in my “event_ti" branch on github. 

Bolke

> Op 4 jun. 2016, om 15:39 heeft Bolke de Bruin <[email protected]> het 
> volgende geschreven:
> 
> 
>> Op 4 jun. 2016, om 03:16 heeft Maxime Beauchemin 
>> <[email protected]> het volgende geschreven:
>> 
>> Caching is a last resort solution and probably not a good thing here. It
>> would introduce lag and confusion.
>> 
> 
> Agreed.
> 
>> You seem to say that some things evaluated twice within a scheduler cycle?
>> What would that be?
> 
> In the old scheduler (before AIRFLOW-128 the refactor of process_dag) it was 
> actually 3 times, although the impact of it was probably around 2 times due 
> to the query cache:
> 
> 1. get_active_runs was calling ti.are_dependencies_met twice. First as part 
> of the overall deadlock check, and because the results were not stored, again 
> for the individual dag_run deadlock check.
> 2. process_dag is calling ti.is_runnable() which in its function also call 
> ti.are_dependencies_met()
> 
> In the new version DagRun.update_state() gets called which is more or less a 
> copy of get_active_runs except for the fact that I store the deadlock checks. 
> process_dag still calls ti.is_runnable() at the moment.
> 
>> 
>> Another option is to reduce the number of database interaction and make
>> sure indexes are in place and balanced (you may want to rebuild your
>> indexes and make sure they are getting used). If the DB is the point of
>> contention, we can assume that getting a fast DB, or rationalizing overall
>> DB interactions (less frequent heartbeats?) would also help.
> 
> I actually verified if this would make a difference as part of AIRFLOW-128. 
> It doesn’t really. Aggregation functions always require a table scan and 
> ti.are_dependencies_met() uses 5 of them (though they probably get optimized 
> to one)
> 
>> 
>> Another option is *distributed scheduling* where as part of the worker
>> after finishing a task instance it would evaluate [only] the directly
>> downstream tasks for execution. In theory the scheduler could just seed the
>> DAG and the downstream tasks be triggered by their parents, immediately
>> after, almost like *callbacks*. Then it matters a lot less how long
>> scheduling cycles actually take…
> 
> 
> Absolutely. I am tinkering with having a “notify_downstream” function in the 
> task instances that call a “update” function on the downstream task. It is 
> conceptually quite a small step to check if the trigger rules have been met 
> and then notify the scheduler that the task can be scheduled. If you think 
> about it further you could probably remove the scheduler logic for 
> determining which task gets scheduled by putting a the TaskInstance on a MQ 
> and let the workers decide which one they can pick up. The scheduler would 
> then just do some garbage collection. I think this could be the next step, 
> not one I am prepared to undertake right now. I think a large speed up can be 
> gotten from the initial suggestion. I am working on a poc that will allow to 
> see the difference in speed.
> 
>> 
>> Max
>> 
>> On Fri, Jun 3, 2016 at 2:26 PM, Bolke de Bruin <[email protected]> wrote:
>> 
>>> Hi,
>>> 
>>> I am looking at speeding up the scheduler. Currently loop times increase
>>> with the amount of tasks in a dag. This is due to
>>> TaskInstance.are_depedencies_met executing several aggregation functions on
>>> the database. These calls are expensive: between 0.05-0.15s per task and
>>> for every scheduler loop this gets called twice. This call is where the
>>> scheduler spends around 90% of its time when evaluating dags and is the
>>> reason for people that have a large amount of tasks per dag to so quite
>>> large loop times (north of 600s).
>>> 
>>> I see 2 options to optimize the loop without going to a multiprocessing
>>> approach which will just put the problem down the line (ie. the db or when
>>> you don’t have enough cores anymore).
>>> 
>>> 1. Cache the call to TI.are_dependencies_met by either caching in a
>>> something like memcache or removing the need for the double call
>>> (update_state and process_dag both make the call to
>>> TI.are_dependencies_met). This would more or less cut the time in half.
>>> 
>>> 2. Notify the downstream tasks of a state change of a upstream task. This
>>> would remove the need for the aggregation as the task would just ‘know’. It
>>> is a bit harder to implement correctly as you need to make sure you keep
>>> being in a consistent state. Obviously you could still run a integrity
>>> check once in a while. This option would make the aggregation event based
>>> and significantly reduce the time spend here to around 1-5% of the current
>>> scheduler. There is a slight overhead added at a state change of the
>>> TaskInstance (managed by the TaskInstance itself).
>>> 
>>> What do you think? My preferred option is #2. Am i missing any other
>>> options? Are scheduler loop times a concern at all?
>>> 
>>> Thanks
>>> Bolke
>>> 
>>> 
>>> 
> 

Reply via email to