This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 32ef0cd  [AIRFLOW-3607] Optimize dep checking when depends on past set 
and concurrency limit
32ef0cd is described below

commit 32ef0cd717d181b9f0c37caae312404385a5c9dd
Author: amichai07 <[email protected]>
AuthorDate: Fri Jun 5 13:38:29 2020 +0300

    [AIRFLOW-3607] Optimize dep checking when depends on past set and 
concurrency limit
---
 airflow/models/dagrun.py | 19 +++++--------------
 1 file changed, 5 insertions(+), 14 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 470dbeb..d30d110 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -305,24 +305,15 @@ class DagRun(Base, LoggingMixin):
                                     for t in unfinished_tasks)
         if unfinished_tasks:
             scheduleable_tasks = [ut for ut in unfinished_tasks if ut.state in 
SCHEDULEABLE_STATES]
+            self.log.debug(
+                "number of scheduleable tasks for %s: %s task(s)",
+                self, len(scheduleable_tasks))
+            ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, 
finished_tasks, session)
+            self.log.debug("ready tis length for %s: %s task(s)", self, 
len(ready_tis))
             if none_depends_on_past and none_task_concurrency:
                 # small speed up
-                self.log.debug(
-                    "number of scheduleable tasks for %s: %s task(s)",
-                    self, len(scheduleable_tasks))
-                ready_tis, changed_tis = 
self._get_ready_tis(scheduleable_tasks, finished_tasks, session)
-                self.log.debug("ready tis length for %s: %s task(s)", self, 
len(ready_tis))
                 are_runnable_tasks = ready_tis or self._are_premature_tis(
                     unfinished_tasks, finished_tasks, session) or changed_tis
-            else:
-                # slow path
-                for ti in scheduleable_tasks:
-                    if ti.are_dependencies_met(
-                        dep_context=DepContext(flag_upstream_failed=True),
-                        session=session
-                    ):
-                        self.log.debug('Queuing task: %s', ti)
-                        ready_tis.append(ti)
 
         duration = (timezone.utcnow() - start_dttm)
         Stats.timing("dagrun.dependency-check.{}".format(self.dag_id), 
duration)

Reply via email to