Teaser: 2016-06-05 18:38:33,371 INFO - Loop took: 0.022339 seconds 2016-06-05 18:38:38,034 INFO - Loop took: 0.019446 seconds 2016-06-05 18:38:43,024 INFO - Loop took: 0.013158 seconds 2016-06-05 18:38:48,166 INFO - Loop took: 0.152832 seconds 2016-06-05 18:38:53,173 INFO - Loop took: 0.163242 seconds 2016-06-05 18:38:58,244 INFO - Loop took: 0.232048 seconds 2016-06-05 18:39:03,520 INFO - Loop took: 0.51223 seconds 2016-06-05 18:39:08,887 INFO - Loop took: 0.878129 seconds 2016-06-05 18:39:14,312 INFO - Loop took: 1.303849 seconds 2016-06-05 18:39:22,404 INFO - Loop took: 4.395173 seconds 2016-06-05 18:39:23,816 INFO - Loop took: 0.811369 seconds 2016-06-05 18:39:30,144 INFO - Loop took: 2.135294 seconds 2016-06-05 18:39:36,488 INFO - Loop took: 3.481462 seconds 2016-06-05 18:39:38,704 INFO - Loop took: 0.698312 seconds 2016-06-05 18:39:46,486 INFO - Loop took: 3.474636 seconds 2016-06-05 18:39:48,816 INFO - Loop took: 0.808555 seconds 2016-06-05 18:39:56,852 INFO - Loop took: 3.840377 seconds 2016-06-05 18:39:58,936 INFO - Loop took: 0.924968 seconds 2016-06-05 18:40:07,392 INFO - Loop took: 4.381813 seconds 2016-06-05 18:40:15,520 INFO - Loop took: 7.503047 seconds 2016-06-05 18:40:15,912 INFO - Loop took: 0.333375 seconds 2016-06-05 18:40:21,580 INFO - Loop took: 0.563334 seconds
This is from a fully event driven scheduler that is working on the same dag as mentioned in AIRFLOW-128. In the non event driven scheduler this was taking 30-60s around peak times. In my approach I do let Tis decide if they are ready to be scheduled (new state “READY”). They set this state when they receive an upstream event that turns evaluate_trigger_rule to true. Obviously, this requires some kickstarter TIs which I obtain from the dag by looking for the TIs that don’t have an upstream. Very very early hacks are in my “event_ti" branch on github. Bolke > Op 4 jun. 2016, om 15:39 heeft Bolke de Bruin <[email protected]> het > volgende geschreven: > > >> Op 4 jun. 2016, om 03:16 heeft Maxime Beauchemin >> <[email protected]> het volgende geschreven: >> >> Caching is a last resort solution and probably not a good thing here. It >> would introduce lag and confusion. >> > > Agreed. > >> You seem to say that some things evaluated twice within a scheduler cycle? >> What would that be? > > In the old scheduler (before AIRFLOW-128 the refactor of process_dag) it was > actually 3 times, although the impact of it was probably around 2 times due > to the query cache: > > 1. get_active_runs was calling ti.are_dependencies_met twice. First as part > of the overall deadlock check, and because the results were not stored, again > for the individual dag_run deadlock check. > 2. process_dag is calling ti.is_runnable() which in its function also call > ti.are_dependencies_met() > > In the new version DagRun.update_state() gets called which is more or less a > copy of get_active_runs except for the fact that I store the deadlock checks. > process_dag still calls ti.is_runnable() at the moment. > >> >> Another option is to reduce the number of database interaction and make >> sure indexes are in place and balanced (you may want to rebuild your >> indexes and make sure they are getting used). If the DB is the point of >> contention, we can assume that getting a fast DB, or rationalizing overall >> DB interactions (less frequent heartbeats?) would also help. > > I actually verified if this would make a difference as part of AIRFLOW-128. > It doesn’t really. Aggregation functions always require a table scan and > ti.are_dependencies_met() uses 5 of them (though they probably get optimized > to one) > >> >> Another option is *distributed scheduling* where as part of the worker >> after finishing a task instance it would evaluate [only] the directly >> downstream tasks for execution. In theory the scheduler could just seed the >> DAG and the downstream tasks be triggered by their parents, immediately >> after, almost like *callbacks*. Then it matters a lot less how long >> scheduling cycles actually take… > > > Absolutely. I am tinkering with having a “notify_downstream” function in the > task instances that call a “update” function on the downstream task. It is > conceptually quite a small step to check if the trigger rules have been met > and then notify the scheduler that the task can be scheduled. If you think > about it further you could probably remove the scheduler logic for > determining which task gets scheduled by putting a the TaskInstance on a MQ > and let the workers decide which one they can pick up. The scheduler would > then just do some garbage collection. I think this could be the next step, > not one I am prepared to undertake right now. I think a large speed up can be > gotten from the initial suggestion. I am working on a poc that will allow to > see the difference in speed. > >> >> Max >> >> On Fri, Jun 3, 2016 at 2:26 PM, Bolke de Bruin <[email protected]> wrote: >> >>> Hi, >>> >>> I am looking at speeding up the scheduler. Currently loop times increase >>> with the amount of tasks in a dag. This is due to >>> TaskInstance.are_depedencies_met executing several aggregation functions on >>> the database. These calls are expensive: between 0.05-0.15s per task and >>> for every scheduler loop this gets called twice. This call is where the >>> scheduler spends around 90% of its time when evaluating dags and is the >>> reason for people that have a large amount of tasks per dag to so quite >>> large loop times (north of 600s). >>> >>> I see 2 options to optimize the loop without going to a multiprocessing >>> approach which will just put the problem down the line (ie. the db or when >>> you don’t have enough cores anymore). >>> >>> 1. Cache the call to TI.are_dependencies_met by either caching in a >>> something like memcache or removing the need for the double call >>> (update_state and process_dag both make the call to >>> TI.are_dependencies_met). This would more or less cut the time in half. >>> >>> 2. Notify the downstream tasks of a state change of a upstream task. This >>> would remove the need for the aggregation as the task would just ‘know’. It >>> is a bit harder to implement correctly as you need to make sure you keep >>> being in a consistent state. Obviously you could still run a integrity >>> check once in a while. This option would make the aggregation event based >>> and significantly reduce the time spend here to around 1-5% of the current >>> scheduler. There is a slight overhead added at a state change of the >>> TaskInstance (managed by the TaskInstance itself). >>> >>> What do you think? My preferred option is #2. Am i missing any other >>> options? Are scheduler loop times a concern at all? >>> >>> Thanks >>> Bolke >>> >>> >>> >
