milletstephane opened a new issue #15654:
URL: https://github.com/apache/airflow/issues/15654
**Apache Airflow version**: 1.10.12
** Postgres version**: 9.6
** Python version**: 3.6.3
** SQLAlchemy version**: 1.3.20
**Environment**:
- **OS** (e.g. from /etc/os-release): Redhat 7
- **Kernel** (e.g. `uname -a`): 3.10.0-327.el7.x86_64
**What happened**:
Our team is running Airflow with Celery Executor inside a local cluster
We implemented a Dag with around 800 tasks, and each task has an average of
4 dependencies.
The dag is also configured with a max_active_runs of 4.
In airflow configuration we set the following parameters:
- [core]
parallelism = 32
dag_concurrency = 16
- [celery]
worker_concurency = 16
When executing this dag, its associated scheduler runtime is around 100s.
So in other words it takes 100 seconds to launch 16 tasks.
**What you expected to happen**:
I would have expected the scheduler to be faster.
**How to reproduce it**:
Create a dag with a large amount of task with many task dependencies
**Anything else we need to know**:
By setting a few traces inside airflow code I identified that most of the
time consumption was linked to the function get_failed_dep_statuses() inside
the file airflow/models/taskinstance.py
```
@provide_session
def get_failed_dep_statuses(self, dep_context=None, session=None):
"""Get failed Dependencies"""
dep_context = dep_context or DepContext()
for dep in dep_context.deps | self.task.deps:
for dep_status in dep.get_dep_statuses(self, session,
dep_context):
self.log.debug(
"%s dependency '%s' PASSED: %s, %s",
self,
dep_status.dep_name,
dep_status.passed,
dep_status.reason,
)
if not dep_status.passed:
yield dep_status
```
During a single call of this function, several connections were made to the
database instead of the expected single one.
By using a yield in this function we create an iterator, and the problem is
that in python3 iterator element are computed on call, thus generating a large
number of connections when a database is involved.
By replacing the yield with a list comprehension, we were able to divide by
two the duration of the runtime:
```
@provide_session
def get_failed_dep_statuses(self, dep_context=None, session=None):
"""Get failed Dependencies"""
dep_context = dep_context or DepContext()
dep_status_array = [ dep_status for dep in dep_context.deps |
self.task.deps
for dep_status in
dep.get_dep_statuses(self, session, dep_context)
if not dep_status.passed ]
return dep_status_array
```
By removing the yield in the get_dep_statuses function, I think that we
could further decrease the runtime duration.
Except if there is a reason that I'm unaware of, "yield" should be avoided
in function used to interact with databases.
If my analysis is incorrect or if I missed some useful configuration please
let me know.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]