This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v3-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-0-test by this push:
     new 1398050e6c9 [Backport v3-0-test] Add back dag parsing pre-import 
optimization (#50371) (#52698)
1398050e6c9 is described below

commit 1398050e6c913d9de7bc6b3c318ea3b1c41532dc
Author: Kevin Liu <[email protected]>
AuthorDate: Fri Jul 4 16:25:27 2025 +0800

    [Backport v3-0-test] Add back dag parsing pre-import optimization (#50371) 
(#52698)
    
    * Add back dag parsing pre-import optimization (#50371)
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    
    * Apply suggestions from code review
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    Co-authored-by: Kaxil Naik <[email protected]>
---
 .../src/airflow/config_templates/config.yml        | 20 ++---
 airflow-core/src/airflow/configuration.py          |  1 +
 .../src/airflow/dag_processing/processor.py        | 28 +++++++
 .../tests/unit/dag_processing/test_processor.py    | 85 +++++++++++++++++++++-
 4 files changed, 122 insertions(+), 12 deletions(-)

diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index 92134fa4bf1..503084e13a9 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -2199,16 +2199,6 @@ scheduler:
       type: integer
       default: "20"
       see_also: ":ref:`scheduler:ha:tunables`"
-    parsing_pre_import_modules:
-      description: |
-        The scheduler reads dag files to extract the airflow modules that are 
going to be used,
-        and imports them ahead of time to avoid having to re-do it for each 
parsing process.
-        This flag can be set to ``False`` to disable this behavior in case an 
airflow module needs
-        to be freshly imported each time (at the cost of increased DAG parsing 
time).
-      version_added: 2.6.0
-      type: boolean
-      example: ~
-      default: "True"
     dag_stale_not_seen_duration:
       description: |
         Time in seconds after which dags, which were not updated by Dag 
Processor are deactivated.
@@ -2563,3 +2553,13 @@ dag_processor:
       type: integer
       example: ~
       default: "10"
+    parsing_pre_import_modules:
+      description: |
+        The dag_processor reads dag files to extract the airflow modules that 
are going to be used,
+        and imports them ahead of time to avoid having to re-do it for each 
parsing process.
+        This flag can be set to ``False`` to disable this behavior in case an 
airflow module needs
+        to be freshly imported each time (at the cost of increased DAG parsing 
time).
+      version_added: 2.6.0
+      type: boolean
+      example: ~
+      default: "True"
diff --git a/airflow-core/src/airflow/configuration.py 
b/airflow-core/src/airflow/configuration.py
index 2bf3df98163..5fd67f53343 100644
--- a/airflow-core/src/airflow/configuration.py
+++ b/airflow-core/src/airflow/configuration.py
@@ -364,6 +364,7 @@ class AirflowConfigParser(ConfigParser):
         ("fab", "navbar_text_hover_color"): ("webserver", 
"navbar_text_hover_color", "3.0.2"),
         ("api", "secret_key"): ("webserver", "secret_key", "3.0.2"),
         ("api", "enable_swagger_ui"): ("webserver", "enable_swagger_ui", 
"3.0.2"),
+        ("dag_processor", "parsing_pre_import_modules"): ("scheduler", 
"parsing_pre_import_modules", "3.0.3"),
     }
 
     # A mapping of new section -> (old section, since_version).
diff --git a/airflow-core/src/airflow/dag_processing/processor.py 
b/airflow-core/src/airflow/dag_processing/processor.py
index 011393f22c8..a54c56ddbe2 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import importlib
 import os
 import sys
 import traceback
@@ -45,6 +46,7 @@ from airflow.sdk.execution_time.comms import (
 from airflow.sdk.execution_time.supervisor import WatchedSubprocess
 from airflow.serialization.serialized_objects import LazyDeserializedDAG, 
SerializedDAG
 from airflow.stats import Stats
+from airflow.utils.file import iter_airflow_imports
 
 if TYPE_CHECKING:
     from structlog.typing import FilteringBoundLogger
@@ -98,6 +100,27 @@ ToDagProcessor = Annotated[
 ]
 
 
+def _pre_import_airflow_modules(file_path: str, log: FilteringBoundLogger) -> 
None:
+    """
+    Pre-import Airflow modules found in the given file.
+
+    This prevents modules from being re-imported in each processing process,
+    saving CPU time and memory.
+    (The default value of "parsing_pre_import_modules" is set to True)
+
+    :param file_path: Path to the file to scan for imports
+    :param log: Logger instance to use for warnings
+    """
+    if not conf.getboolean("dag_processor", "parsing_pre_import_modules", 
fallback=True):
+        return
+
+    for module in iter_airflow_imports(file_path):
+        try:
+            importlib.import_module(module)
+        except ModuleNotFoundError as e:
+            log.warning("Error when trying to pre-import module '%s' found in 
%s: %s", module, file_path, e)
+
+
 def _parse_file_entrypoint():
     import structlog
 
@@ -127,6 +150,7 @@ def _parse_file_entrypoint():
 
 def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> 
DagFileParsingResult | None:
     # TODO: Set known_pool names on DagBag!
+
     bag = DagBag(
         dag_folder=msg.file,
         bundle_path=msg.bundle_path,
@@ -250,6 +274,10 @@ class DagFileProcessorProcess(WatchedSubprocess):
         client: Client,
         **kwargs,
     ) -> Self:
+        logger = kwargs["logger"]
+
+        _pre_import_airflow_modules(os.fspath(path), logger)
+
         proc: Self = super().start(target=target, client=client, **kwargs)
         proc._on_child_started(callbacks, path, bundle_path)
         return proc
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 8d77da61caf..e8ab15a0222 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -37,6 +37,7 @@ from airflow.dag_processing.processor import (
     DagFileParsingResult,
     DagFileProcessorProcess,
     _parse_file,
+    _pre_import_airflow_modules,
 )
 from airflow.models import DagBag, TaskInstance
 from airflow.models.baseoperator import BaseOperator
@@ -140,8 +141,13 @@ class TestDagFileProcessor:
         assert "a.py" in resp.import_errors
 
     def test_top_level_variable_access(
-        self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch: 
pytest.MonkeyPatch, inprocess_client
+        self,
+        spy_agency: SpyAgency,
+        tmp_path: pathlib.Path,
+        monkeypatch: pytest.MonkeyPatch,
+        inprocess_client,
     ):
+        logger = MagicMock()
         logger_filehandle = MagicMock()
 
         def dag_in_a_fn():
@@ -158,6 +164,7 @@ class TestDagFileProcessor:
             path=path,
             bundle_path=tmp_path,
             callbacks=[],
+            logger=logger,
             logger_filehandle=logger_filehandle,
             client=inprocess_client,
         )
@@ -171,8 +178,13 @@ class TestDagFileProcessor:
         assert result.serialized_dags[0].dag_id == "test_abc"
 
     def test_top_level_variable_access_not_found(
-        self, spy_agency: SpyAgency, tmp_path: pathlib.Path, monkeypatch: 
pytest.MonkeyPatch, inprocess_client
+        self,
+        spy_agency: SpyAgency,
+        tmp_path: pathlib.Path,
+        monkeypatch: pytest.MonkeyPatch,
+        inprocess_client,
     ):
+        logger = MagicMock()
         logger_filehandle = MagicMock()
 
         def dag_in_a_fn():
@@ -187,6 +199,7 @@ class TestDagFileProcessor:
             path=path,
             bundle_path=tmp_path,
             callbacks=[],
+            logger=logger,
             logger_filehandle=logger_filehandle,
             client=inprocess_client,
         )
@@ -203,6 +216,7 @@ class TestDagFileProcessor:
     def test_top_level_variable_set(self, tmp_path: pathlib.Path, 
inprocess_client):
         from airflow.models.variable import Variable as VariableORM
 
+        logger = MagicMock()
         logger_filehandle = MagicMock()
 
         def dag_in_a_fn():
@@ -218,6 +232,7 @@ class TestDagFileProcessor:
             path=path,
             bundle_path=tmp_path,
             callbacks=[],
+            logger=logger,
             logger_filehandle=logger_filehandle,
             client=inprocess_client,
         )
@@ -238,6 +253,7 @@ class TestDagFileProcessor:
     def test_top_level_variable_delete(self, tmp_path: pathlib.Path, 
inprocess_client):
         from airflow.models.variable import Variable as VariableORM
 
+        logger = MagicMock()
         logger_filehandle = MagicMock()
 
         def dag_in_a_fn():
@@ -259,6 +275,7 @@ class TestDagFileProcessor:
             path=path,
             bundle_path=tmp_path,
             callbacks=[],
+            logger=logger,
             logger_filehandle=logger_filehandle,
             client=inprocess_client,
         )
@@ -278,6 +295,7 @@ class TestDagFileProcessor:
     def test_top_level_connection_access(
         self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, 
inprocess_client
     ):
+        logger = MagicMock()
         logger_filehandle = MagicMock()
 
         def dag_in_a_fn():
@@ -295,6 +313,7 @@ class TestDagFileProcessor:
             path=path,
             bundle_path=tmp_path,
             callbacks=[],
+            logger=logger,
             logger_filehandle=logger_filehandle,
             client=inprocess_client,
         )
@@ -308,6 +327,7 @@ class TestDagFileProcessor:
         assert result.serialized_dags[0].dag_id == "test_my_conn"
 
     def test_top_level_connection_access_not_found(self, tmp_path: 
pathlib.Path, inprocess_client):
+        logger = MagicMock()
         logger_filehandle = MagicMock()
 
         def dag_in_a_fn():
@@ -323,6 +343,7 @@ class TestDagFileProcessor:
             path=path,
             bundle_path=tmp_path,
             callbacks=[],
+            logger=logger,
             logger_filehandle=logger_filehandle,
             client=inprocess_client,
         )
@@ -355,6 +376,7 @@ class TestDagFileProcessor:
             path=dag1_path,
             bundle_path=tmp_path,
             callbacks=[],
+            logger=MagicMock(),
             logger_filehandle=MagicMock(),
             client=inprocess_client,
         )
@@ -366,6 +388,65 @@ class TestDagFileProcessor:
         assert result.import_errors == {}
         assert result.serialized_dags[0].dag_id == "dag_name"
 
+    def test__pre_import_airflow_modules_when_disabled(self):
+        logger = MagicMock()
+        with (
+            env_vars({"AIRFLOW__DAG_PROCESSOR__PARSING_PRE_IMPORT_MODULES": 
"false"}),
+            patch("airflow.dag_processing.processor.iter_airflow_imports") as 
mock_iter,
+        ):
+            _pre_import_airflow_modules("test.py", logger)
+
+        mock_iter.assert_not_called()
+        logger.warning.assert_not_called()
+
+    def test__pre_import_airflow_modules_when_enabled(self):
+        logger = MagicMock()
+        with (
+            env_vars({"AIRFLOW__DAG_PROCESSOR__PARSING_PRE_IMPORT_MODULES": 
"true"}),
+            patch("airflow.dag_processing.processor.iter_airflow_imports", 
return_value=["airflow.models"]),
+            patch("airflow.dag_processing.processor.importlib.import_module") 
as mock_import,
+        ):
+            _pre_import_airflow_modules("test.py", logger)
+
+        mock_import.assert_called_once_with("airflow.models")
+        logger.warning.assert_not_called()
+
+    def test__pre_import_airflow_modules_warns_on_missing_module(self):
+        logger = MagicMock()
+        with (
+            env_vars({"AIRFLOW__DAG_PROCESSOR__PARSING_PRE_IMPORT_MODULES": 
"true"}),
+            patch(
+                "airflow.dag_processing.processor.iter_airflow_imports", 
return_value=["non_existent_module"]
+            ),
+            patch(
+                "airflow.dag_processing.processor.importlib.import_module", 
side_effect=ModuleNotFoundError()
+            ),
+        ):
+            _pre_import_airflow_modules("test.py", logger)
+
+        logger.warning.assert_called_once()
+        warning_args = logger.warning.call_args[0]
+        assert "Error when trying to pre-import module" in warning_args[0]
+        assert "non_existent_module" in warning_args[1]
+        assert "test.py" in warning_args[2]
+
+    def test__pre_import_airflow_modules_partial_success_and_warning(self):
+        logger = MagicMock()
+        with (
+            env_vars({"AIRFLOW__DAG_PROCESSOR__PARSING_PRE_IMPORT_MODULES": 
"true"}),
+            patch(
+                "airflow.dag_processing.processor.iter_airflow_imports",
+                return_value=["airflow.models", "non_existent_module"],
+            ),
+            patch(
+                "airflow.dag_processing.processor.importlib.import_module",
+                side_effect=[None, ModuleNotFoundError()],
+            ),
+        ):
+            _pre_import_airflow_modules("test.py", logger)
+
+        assert logger.warning.call_count == 1
+
 
 def write_dag_in_a_fn_to_file(fn: Callable[[], None], folder: pathlib.Path) -> 
pathlib.Path:
     # Create the dag in a fn, and use inspect.getsource to write it to a file 
so that

Reply via email to