Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8c441a0d8 -> 4cd72b91b


[AIRFLOW-2777] speed up dag.sub_dag(...)

previous version created the subdag by copying
over all the tasks, and
then filtering them down. it's a lot faster if we
only copy over the
tasks we need

Closes #3621 from abdul-stripe/faster-subdag


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4cd72b91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4cd72b91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4cd72b91

Branch: refs/heads/master
Commit: 4cd72b91bd024e6be9abb30aee345c0a90a6561b
Parents: 8c441a0
Author: Abdul Nimeri <[email protected]>
Authored: Thu Jul 26 20:53:11 2018 +0200
Committer: Bolke de Bruin <[email protected]>
Committed: Thu Jul 26 20:53:11 2018 +0200

----------------------------------------------------------------------
 airflow/models.py | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cd72b91/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index d6a9199..1d832ab 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3905,10 +3905,15 @@ class DAG(BaseDag, LoggingMixin):
         upstream and downstream neighbours based on the flag passed.
         """
 
+        # deep-copying self.task_dict takes a long time, and we don't want all
+        # the tasks anyway, so we copy the tasks manually later
+        task_dict = self.task_dict
+        self.task_dict = {}
         dag = copy.deepcopy(self)
+        self.task_dict = task_dict
 
         regex_match = [
-            t for t in dag.tasks if re.findall(task_regex, t.task_id)]
+            t for t in self.tasks if re.findall(task_regex, t.task_id)]
         also_include = []
         for t in regex_match:
             if include_downstream:
@@ -3917,7 +3922,9 @@ class DAG(BaseDag, LoggingMixin):
                 also_include += t.get_flat_relatives(upstream=True)
 
         # Compiling the unique list of tasks that made the cut
-        dag.task_dict = {t.task_id: t for t in regex_match + also_include}
+        # Make sure to not recursively deepcopy the dag while copying the task
+        dag.task_dict = {t.task_id: copy.deepcopy(t, {id(t.dag): t.dag})
+                         for t in regex_match + also_include}
         for t in dag.tasks:
             # Removing upstream/downstream references to tasks that did not
             # made the cut

Reply via email to