milletstephane commented on issue #15654:
URL: https://github.com/apache/airflow/issues/15654#issuecomment-833523962


   If like me you are stuck with an old version of airflow, here is an update 
on the problem.
   
   My assumption that the problem was linked to the time taken to establish a 
connection with the database was not correct. 
   
   And it seems that rather than a bug, this behaviour is a choice. The 
scheduler is optimized for a high reactivity with DAG containing few tasks. The 
drawback is that if you build a single DAG with hundreds or thousands of  task 
then performnaces will be quite awful ...
   
   This behaviour is induced in the code of the file "jobs/scheduler_job.py", 
specifically in the function _process_task_instances and there is even a 
comment indicating the slow loop: 
   
   ```
           for run in active_dag_runs:
               self.log.debug("Examining active DAG run: %s", run)
               tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
   
               # this loop is quite slow as it uses are_dependencies_met for
               # every task (in ti.is_runnable). This is also called in
               # update_state above which has already checked these tasks
               for ti in tis:
                   task = dag.get_task(ti.task_id)
   
                   # fixme: ti.task is transient but needs to be set
                   ti.task = task
   
                   if ti.are_dependencies_met(
                           dep_context=DepContext(flag_upstream_failed=True),
                           session=session):
                       self.log.debug('Queuing task: %s', ti)
                       task_instances_list.append(ti.key)
   ```
   
   By running it and setting a few traces, we identified that the most time 
consumming function was "ensure_finished_tasks" from  the class DepContext. 
   This function runtime isaround 20ms on our platform (may vary depending on 
the number of tasks or database). And it is called 800 times, 4 times as we 
decided to launch 4 dag runs sequentially.
   
   20 ms * 800 * 4 = 64 seconds
   
   And looking at the function we see that if the list of finished tasks is 
already set, the database won't be queried. So the solution was quite simple, 
retrieve the list of finished tasks at the beginning at each dagrun loop then 
use this result inside each loop on tasks element.
   I ended with the following code:
   
   ```
           for run in active_dag_runs:
               self.log.debug("Examining active DAG run: %s", run)
               tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
   
               # here we get the finished tasks for each execution date in the 
taskinstances
               dep_con = DepContext(flag_upstream_failed=True)
               ti_exec_date_array = list(set([ ti.execution_date for ti in tis 
]))
               res = {}
               for exec_date in ti_exec_date_array:
   
                   finished_tasks = list(dep_con.ensure_finished_tasks(
                       dag, exec_date, session
                       ))
   
                   res[exec_date] = finished_tasks
   
               # this loop is quite slow as it uses are_dependencies_met for
               # every task (in ti.is_runnable). This is also called in
               # update_state above which has already checked these tasks
               for ti in tis:
                   task = dag.get_task(ti.task_id)
   
                   # fixme: ti.task is transient but needs to be set
                   ti.task = task
   
                   dep_con = DepContext(flag_upstream_failed=True)
   
                   # the finished tasks are set with the result of the upper 
loop call
                   dep_con.finished_tasks = res[ ti.execution_date ]
   
                   if ti.are_dependencies_met(
                           dep_context=dep_con,
                           session=session):
                       self.log.debug('Queuing task: %s', ti)
                       task_instances_list.append(ti.key)
   ```
   
   As we don't know if the  dep_context is modified during the execution of the 
function "are_dependencies_met", it was decided to get the list of finished 
tasks and apply it on dep_context object newly created.  
   
   When setting the finished task in the dep_con object, we could also use a 
deepcopy to ensure that the finished_tasks won't be modified. (but why would 
they ?)
   
   Conclusion:
   
   With this dirty trick the DAG runtime is now below 10sec which is not 
perfect but quite correct. 
   I hope this helps


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to