milletstephane opened a new issue #15654:
URL: https://github.com/apache/airflow/issues/15654


   **Apache Airflow version**: 1.10.12
   ** Postgres version**: 9.6
   ** Python version**: 3.6.3
   ** SQLAlchemy version**:  1.3.20
   
   **Environment**:
   
   - **OS** (e.g. from /etc/os-release): Redhat 7
   - **Kernel** (e.g. `uname -a`): 3.10.0-327.el7.x86_64
   
   **What happened**:
   
   Our team is running Airflow with Celery Executor inside a local cluster
   
   We implemented a Dag with around 800 tasks, and each task has an average of 
4 dependencies.
   The dag is also configured with a max_active_runs of 4.
   
   In airflow configuration we set the following parameters:
    - [core]
       parallelism = 32
       dag_concurrency = 16
    - [celery] 
       worker_concurency = 16
   
   When executing this dag, its associated scheduler runtime is around 100s. 
   So in other words it takes 100 seconds to launch 16 tasks. 
   
   **What you expected to happen**:
   
   I would have expected the scheduler to be faster. 
   
   **How to reproduce it**:
   
   Create a dag with a large amount of task with many task dependencies
   
   **Anything else we need to know**:
   
   By setting a few traces inside airflow code I identified that most of the 
time consumption was linked to the function get_failed_dep_statuses() inside 
the file airflow/models/taskinstance.py
   
   ```
       @provide_session
       def get_failed_dep_statuses(self, dep_context=None, session=None):
           """Get failed Dependencies"""
           dep_context = dep_context or DepContext()
           for dep in dep_context.deps | self.task.deps:
               for dep_status in dep.get_dep_statuses(self, session, 
dep_context):
   
                   self.log.debug(
                       "%s dependency '%s' PASSED: %s, %s",
                       self,
                       dep_status.dep_name,
                       dep_status.passed,
                       dep_status.reason,
                   )
   
                   if not dep_status.passed:
                       yield dep_status
   ```
   
   During a single call of this function, several connections were made to the 
database instead of the expected single one. 
   
   By using a yield in this function we create an iterator, and the problem is 
that in python3 iterator element are computed on call, thus generating a large 
number of connections when a database is involved.
   
   By replacing the yield with a list comprehension, we were able to divide by 
two the duration of the runtime:
   
   ```
       @provide_session
       def get_failed_dep_statuses(self, dep_context=None, session=None):
           """Get failed Dependencies"""
           
           dep_context = dep_context or DepContext()
           
           dep_status_array = [ dep_status for dep in dep_context.deps | 
self.task.deps 
                                           for dep_status in 
dep.get_dep_statuses(self, session, dep_context)
                                           if not dep_status.passed ]
   
           return dep_status_array
   ```
   
   By removing the yield in the get_dep_statuses function, I think that we 
could further decrease the runtime duration.
   
   Except if there is a reason that I'm unaware of, "yield" should be avoided 
in function used to interact with databases.
   If my analysis is incorrect or if I missed some useful configuration please 
let me know.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to