This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 32ef0cd [AIRFLOW-3607] Optimize dep checking when depends on past set
and concurrency limit
32ef0cd is described below
commit 32ef0cd717d181b9f0c37caae312404385a5c9dd
Author: amichai07 <[email protected]>
AuthorDate: Fri Jun 5 13:38:29 2020 +0300
[AIRFLOW-3607] Optimize dep checking when depends on past set and
concurrency limit
---
airflow/models/dagrun.py | 19 +++++--------------
1 file changed, 5 insertions(+), 14 deletions(-)
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 470dbeb..d30d110 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -305,24 +305,15 @@ class DagRun(Base, LoggingMixin):
for t in unfinished_tasks)
if unfinished_tasks:
scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in
SCHEDULEABLE_STATES]
+ self.log.debug(
+ "number of scheduleable tasks for %s: %s task(s)",
+ self, len(scheduleable_tasks))
+ ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks,
finished_tasks, session)
+ self.log.debug("ready tis length for %s: %s task(s)", self,
len(ready_tis))
if none_depends_on_past and none_task_concurrency:
# small speed up
- self.log.debug(
- "number of scheduleable tasks for %s: %s task(s)",
- self, len(scheduleable_tasks))
- ready_tis, changed_tis =
self._get_ready_tis(scheduleable_tasks, finished_tasks, session)
- self.log.debug("ready tis length for %s: %s task(s)", self,
len(ready_tis))
are_runnable_tasks = ready_tis or self._are_premature_tis(
unfinished_tasks, finished_tasks, session) or changed_tis
- else:
- # slow path
- for ti in scheduleable_tasks:
- if ti.are_dependencies_met(
- dep_context=DepContext(flag_upstream_failed=True),
- session=session
- ):
- self.log.debug('Queuing task: %s', ti)
- ready_tis.append(ti)
duration = (timezone.utcnow() - start_dttm)
Stats.timing("dagrun.dependency-check.{}".format(self.dag_id),
duration)