Repository: incubator-airflow Updated Branches: refs/heads/master 8c441a0d8 -> 4cd72b91b
[AIRFLOW-2777] speed up dag.sub_dag(...) previous version created the subdag by copying over all the tasks, and then filtering them down. it's a lot faster if we only copy over the tasks we need Closes #3621 from abdul-stripe/faster-subdag Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4cd72b91 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4cd72b91 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4cd72b91 Branch: refs/heads/master Commit: 4cd72b91bd024e6be9abb30aee345c0a90a6561b Parents: 8c441a0 Author: Abdul Nimeri <[email protected]> Authored: Thu Jul 26 20:53:11 2018 +0200 Committer: Bolke de Bruin <[email protected]> Committed: Thu Jul 26 20:53:11 2018 +0200 ---------------------------------------------------------------------- airflow/models.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4cd72b91/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index d6a9199..1d832ab 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3905,10 +3905,15 @@ class DAG(BaseDag, LoggingMixin): upstream and downstream neighbours based on the flag passed. """ + # deep-copying self.task_dict takes a long time, and we don't want all + # the tasks anyway, so we copy the tasks manually later + task_dict = self.task_dict + self.task_dict = {} dag = copy.deepcopy(self) + self.task_dict = task_dict regex_match = [ - t for t in dag.tasks if re.findall(task_regex, t.task_id)] + t for t in self.tasks if re.findall(task_regex, t.task_id)] also_include = [] for t in regex_match: if include_downstream: @@ -3917,7 +3922,9 @@ class DAG(BaseDag, LoggingMixin): also_include += t.get_flat_relatives(upstream=True) # Compiling the unique list of tasks that made the cut - dag.task_dict = {t.task_id: t for t in regex_match + also_include} + # Make sure to not recursively deepcopy the dag while copying the task + dag.task_dict = {t.task_id: copy.deepcopy(t, {id(t.dag): t.dag}) + for t in regex_match + also_include} for t in dag.tasks: # Removing upstream/downstream references to tasks that did not # made the cut
