milletstephane commented on issue #15654:
URL: https://github.com/apache/airflow/issues/15654#issuecomment-833523962
If like me you are stuck with an old version of airflow, here is an update
on the problem.
My assumption that the problem was linked to the time taken to establish a
connection with the database was not correct.
And it seems that rather than a bug, this behaviour is a choice. The
scheduler is optimized for a high reactivity with DAG containing few tasks. The
drawback is that if you build a single DAG with hundreds or thousands of task
then performnaces will be quite awful ...
This behaviour is induced in the code of the file "jobs/scheduler_job.py",
specifically in the function _process_task_instances and there is even a
comment indicating the slow loop:
```
for run in active_dag_runs:
self.log.debug("Examining active DAG run: %s", run)
tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
# this loop is quite slow as it uses are_dependencies_met for
# every task (in ti.is_runnable). This is also called in
# update_state above which has already checked these tasks
for ti in tis:
task = dag.get_task(ti.task_id)
# fixme: ti.task is transient but needs to be set
ti.task = task
if ti.are_dependencies_met(
dep_context=DepContext(flag_upstream_failed=True),
session=session):
self.log.debug('Queuing task: %s', ti)
task_instances_list.append(ti.key)
```
By running it and setting a few traces, we identified that the most time
consumming function was "ensure_finished_tasks" from the class DepContext.
This function runtime isaround 20ms on our platform (may vary depending on
the number of tasks or database). And it is called 800 times, 4 times as we
decided to launch 4 dag runs sequentially.
20 ms * 800 * 4 = 64 seconds
And looking at the function we see that if the list of finished tasks is
already set, the database won't be queried. So the solution was quite simple,
retrieve the list of finished tasks at the beginning at each dagrun loop then
use this result inside each loop on tasks element.
I ended with the following code:
```
for run in active_dag_runs:
self.log.debug("Examining active DAG run: %s", run)
tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
# here we get the finished tasks for each execution date in the
taskinstances
dep_con = DepContext(flag_upstream_failed=True)
ti_exec_date_array = list(set([ ti.execution_date for ti in tis
]))
res = {}
for exec_date in ti_exec_date_array:
finished_tasks = list(dep_con.ensure_finished_tasks(
dag, exec_date, session
))
res[exec_date] = finished_tasks
# this loop is quite slow as it uses are_dependencies_met for
# every task (in ti.is_runnable). This is also called in
# update_state above which has already checked these tasks
for ti in tis:
task = dag.get_task(ti.task_id)
# fixme: ti.task is transient but needs to be set
ti.task = task
dep_con = DepContext(flag_upstream_failed=True)
# the finished tasks are set with the result of the upper
loop call
dep_con.finished_tasks = res[ ti.execution_date ]
if ti.are_dependencies_met(
dep_context=dep_con,
session=session):
self.log.debug('Queuing task: %s', ti)
task_instances_list.append(ti.key)
```
As we don't know if the dep_context is modified during the execution of the
function "are_dependencies_met", it was decided to get the list of finished
tasks and apply it on dep_context object newly created.
When setting the finished task in the dep_con object, we could also use a
deepcopy to ensure that the finished_tasks won't be modified. (but why would
they ?)
Conclusion:
With this dirty trick the DAG runtime is now below 10sec which is not
perfect but quite correct.
I hope this helps
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]