This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 36e2e43def Remove double collection of dags in `airflow dags
reserialize` (#27030)
36e2e43def is described below
commit 36e2e43def6a27d9bf2cab4d27d104414bea3f7f
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Oct 13 19:29:00 2022 +0100
Remove double collection of dags in `airflow dags reserialize` (#27030)
We explicitly call dagbag.collect_dags after instantiating DagBag in
airflow dags reserialize code.
The method collect_dags is called on instantiation
of the DagBag so calling it again means more processing of the same dags.
Here, we use a variable to achieve the same needed effect on reserialization
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
airflow/models/dagbag.py | 12 +++++++-----
airflow/utils/db.py | 4 ++--
tests/models/test_dagbag.py | 13 +++++++++++++
3 files changed, 22 insertions(+), 7 deletions(-)
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index cabc142f31..849498c1e4 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -98,6 +98,7 @@ class DagBag(LoggingMixin):
read_dags_from_db: bool = False,
store_serialized_dags: bool | None = None,
load_op_links: bool = True,
+ collect_dags: bool = True,
):
# Avoid circular import
from airflow.models.dag import DAG
@@ -137,11 +138,12 @@ class DagBag(LoggingMixin):
self.dagbag_import_error_tracebacks = conf.getboolean('core',
'dagbag_import_error_tracebacks')
self.dagbag_import_error_traceback_depth = conf.getint('core',
'dagbag_import_error_traceback_depth')
- self.collect_dags(
- dag_folder=dag_folder,
- include_examples=include_examples,
- safe_mode=safe_mode,
- )
+ if collect_dags:
+ self.collect_dags(
+ dag_folder=dag_folder,
+ include_examples=include_examples,
+ safe_mode=safe_mode,
+ )
# Should the extra operator link be loaded via plugins?
# This flag is set to False in Scheduler so that Extra Operator links
are not loaded
self.load_op_links = load_op_links
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index f796156f8f..d581925791 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -831,8 +831,8 @@ def reserialize_dags(*, session: Session = NEW_SESSION) ->
None:
from airflow.models.serialized_dag import SerializedDagModel
session.query(SerializedDagModel).delete(synchronize_session=False)
- dagbag = DagBag()
- dagbag.collect_dags(only_if_updated=False, safe_mode=False)
+ dagbag = DagBag(collect_dags=False)
+ dagbag.collect_dags(only_if_updated=False)
dagbag.sync_to_db(session=session)
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index ed20d41ef0..f55a31400d 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -1060,3 +1060,16 @@ class TestDagBag:
dagbag = DagBag(dag_folder=dag_file, include_examples=False)
assert len(dagbag.dag_ids) == 0
assert "has no tags" in dagbag.import_errors[dag_file]
+
+ def test_dagbag_dag_collection(self):
+
+ dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False,
collect_dags=False)
+ # since collect_dags is False, dagbag.dags should be empty
+ assert not dagbag.dags
+
+ dagbag.collect_dags()
+ assert dagbag.dags
+
+ # test that dagbag.dags is not empty if collect_dags is True
+ dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False)
+ assert dagbag.dags