milletstephane edited a comment on issue #15654: URL: https://github.com/apache/airflow/issues/15654#issuecomment-833523962
[scheduler_job.py.txt](https://github.com/apache/airflow/files/6434885/scheduler_job.py.txt) If like me you are stuck with an old version of airflow, here is an update on the problem. My assumption that the problem was linked to the time taken to establish a connection with the database was not correct. And it seems that rather than a bug, this behaviour is a choice. The scheduler is optimized for a high reactivity with DAG containing few tasks. The drawback is that if you build a single DAG with hundreds or thousands of task then performnaces will be quite awful ... This behaviour is induced in the code of the file "jobs/scheduler_job.py", specifically in the function _process_task_instances and there is even a comment indicating the slow loop: ``` for run in active_dag_runs: self.log.debug("Examining active DAG run: %s", run) tis = run.get_task_instances(state=SCHEDULEABLE_STATES) # this loop is quite slow as it uses are_dependencies_met for # every task (in ti.is_runnable). This is also called in # update_state above which has already checked these tasks for ti in tis: task = dag.get_task(ti.task_id) # fixme: ti.task is transient but needs to be set ti.task = task if ti.are_dependencies_met( dep_context=DepContext(flag_upstream_failed=True), session=session): self.log.debug('Queuing task: %s', ti) task_instances_list.append(ti.key) ``` By running it and setting a few traces, we identified that the most time consumming function was "ensure_finished_tasks" from the class DepContext. This function runtime isaround 20ms on our platform (may vary depending on the number of tasks or database). And it is called 800 times, 4 times as we decided to launch 4 dag runs sequentially. 20 ms * 800 * 4 = 64 seconds And looking at the function we see that if the list of finished tasks is already set, the database won't be queried. So the solution was quite simple, retrieve the list of finished tasks at the beginning at each dagrun loop then use this result inside each loop on tasks element. I ended with the following code: ``` for run in active_dag_runs: self.log.debug("Examining active DAG run: %s", run) tis = run.get_task_instances(state=SCHEDULEABLE_STATES) # here we get the finished tasks for each execution date in the taskinstances dep_con = DepContext(flag_upstream_failed=True) ti_exec_date_array = list(set([ ti.execution_date for ti in tis ])) res = {} for exec_date in ti_exec_date_array: finished_tasks = list(dep_con.ensure_finished_tasks( dag, exec_date, session )) res[exec_date] = finished_tasks # this loop is quite slow as it uses are_dependencies_met for # every task (in ti.is_runnable). This is also called in # update_state above which has already checked these tasks for ti in tis: task = dag.get_task(ti.task_id) # fixme: ti.task is transient but needs to be set ti.task = task dep_con = DepContext(flag_upstream_failed=True) # the finished tasks are set with the result of the upper loop call dep_con.finished_tasks = res[ ti.execution_date ] if ti.are_dependencies_met( dep_context=dep_con, session=session): self.log.debug('Queuing task: %s', ti) task_instances_list.append(ti.key) ``` As we don't know if the dep_context is modified during the execution of the function "are_dependencies_met", it was decided to get the list of finished tasks and apply it on dep_context object newly created. When setting the finished task in the dep_con object, we could also use a deepcopy to ensure that the finished_tasks won't be modified. (but why would they ?) Conclusion: With this dirty trick the DAG runtime is now below 10sec which is not perfect but quite correct. I hope this helps -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
