This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 4439c931429103b1675acc58696f46c591319741
Author: RaphaĆ«l Vandon <[email protected]>
AuthorDate: Fri Apr 14 10:22:19 2023 -0700

    preload airflow imports before dag parsing to save time (#30495)
    
    ---------
    
    Co-authored-by: Tzu-ping Chung <[email protected]>
    Co-authored-by: Ephraim Anierobi <[email protected]>
    (cherry picked from commit 9fab11cf51411276f3e70a24a23795bf959784d9)
---
 .pre-commit-config.yaml                      |  4 +--
 airflow/config_templates/config.yml          | 11 ++++++-
 airflow/config_templates/default_airflow.cfg |  6 ++++
 airflow/dag_processing/processor.py          | 19 +++++++++++
 airflow/utils/file.py                        | 21 +++++++++++++
 tests/dag_processing/test_job_runner.py      |  3 +-
 tests/dags/test_imports.py                   | 47 ++++++++++++++++++++++++++++
 tests/jobs/test_scheduler_job.py             |  1 +
 tests/utils/test_file.py                     | 26 +++++++++++++++
 9 files changed, 134 insertions(+), 4 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index cc33daf958..36e80151ee 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -181,7 +181,7 @@ repos:
         entry: ruff --fix --no-update-check --force-exclude
         additional_dependencies: ['ruff==0.0.226']
         files: \.pyi?$
-        exclude: ^.*/.*_vendor/
+        exclude: ^.*/.*_vendor/|^tests/dags/test_imports.py
   - repo: https://github.com/asottile/blacken-docs
     rev: 1.13.0
     hooks:
@@ -905,7 +905,7 @@ repos:
         language: python
         entry: ./scripts/ci/pre_commit/pre_commit_mypy.py --namespace-packages
         files: \.py$
-        exclude: 
^.*/.*_vendor/|^airflow/migrations|^airflow/providers|^dev|^docs|^provider_packages|^tests/providers|^tests/system/providers
+        exclude: 
^.*/.*_vendor/|^airflow/migrations|^airflow/providers|^dev|^docs|^provider_packages|^tests/providers|^tests/system/providers|^tests/dags/test_imports.py
         require_serial: true
         additional_dependencies: ['rich>=12.4.4', 'inputimeout', 'pyyaml']
       - id: mypy-providers
diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 1d458a8c36..43bad45e41 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -2324,6 +2324,16 @@ scheduler:
       version_added: 2.0.0
       type: boolean
       default: "True"
+    parsing_pre_import_modules:
+      description: |
+        The scheduler reads dag files to extract the airflow modules that are 
going to be used,
+        and imports them ahead of time to avoid having to re-do it for each 
parsing process.
+        This flag can be set to False to disable this behavior in case an 
airflow module needs to be freshly
+        imported each time (at the cost of increased DAG parsing time).
+      version_added: 2.6.0
+      type: boolean
+      example: ~
+      default: "True"
     parsing_processes:
       description: |
         The scheduler can run multiple processes in parallel to parse dags.
@@ -2343,7 +2353,6 @@ scheduler:
           same host. This is useful when running with Scheduler in HA mode 
where each scheduler can
           parse different DAG files.
         * ``alphabetical``: Sort by filename
-
       version_added: 2.1.0
       type: string
       example: ~
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 69393e014e..e5d92429c0 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -1178,6 +1178,12 @@ max_dagruns_per_loop_to_schedule = 20
 # dags in some circumstances
 schedule_after_task_execution = True
 
+# The scheduler reads dag files to extract the airflow modules that are going 
to be used,
+# and imports them ahead of time to avoid having to re-do it for each parsing 
process.
+# This flag can be set to False to disable this behavior in case an airflow 
module needs to be freshly
+# imported each time (at the cost of increased DAG parsing time).
+parsing_pre_import_modules = True
+
 # The scheduler can run multiple processes in parallel to parse dags.
 # This defines how many processes will run.
 parsing_processes = 2
diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index f1ba08e496..442431afb5 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import importlib
 import logging
 import multiprocessing
 import os
@@ -50,6 +51,7 @@ from airflow.models.taskinstance import TaskInstance as TI
 from airflow.stats import Stats
 from airflow.utils import timezone
 from airflow.utils.email import get_email_address_list, send_email
+from airflow.utils.file import iter_airflow_imports
 from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, 
set_context
 from airflow.utils.mixins import MultiprocessingStartMethodMixin
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -187,6 +189,23 @@ class DagFileProcessorProcess(LoggingMixin, 
MultiprocessingStartMethodMixin):
 
     def start(self) -> None:
         """Launch the process and start processing the DAG."""
+        if conf.getboolean("scheduler", "parsing_pre_import_modules", 
fallback=True):
+            # Read the file to pre-import airflow modules used.
+            # This prevents them from being re-imported from zero in each 
"processing" process
+            # and saves CPU time and memory.
+            for module in iter_airflow_imports(self.file_path):
+                try:
+                    importlib.import_module(module)
+                except Exception as e:
+                    # only log as warning because an error here is not 
preventing anything from working, and
+                    # if it's serious, it's going to be surfaced to the user 
when the dag is actually parsed.
+                    self.log.warning(
+                        "Error when trying to pre-import module '%s' found in 
%s: %s",
+                        module,
+                        self.file_path,
+                        e,
+                    )
+
         context = self._get_multiprocessing_context()
 
         _parent_channel, _child_channel = context.Pipe(duplex=False)
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index b3b1e8f3d1..81089e06d4 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+import ast
 import io
 import logging
 import os
@@ -371,3 +372,23 @@ def might_contain_dag_via_default_heuristic(file_path: 
str, zip_file: zipfile.Zi
             content = dag_file.read()
     content = content.lower()
     return all(s in content for s in (b"dag", b"airflow"))
+
+
+def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
+    for st in module.body:
+        if isinstance(st, ast.Import):
+            for n in st.names:
+                yield n.name
+        elif isinstance(st, ast.ImportFrom) and st.module is not None:
+            yield st.module
+
+
+def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
+    """Find Airflow modules imported in the given file."""
+    try:
+        parsed = ast.parse(Path(file_path).read_bytes())
+    except (OSError, SyntaxError, UnicodeDecodeError):
+        return
+    for m in _find_imported_modules(parsed):
+        if m.startswith("airflow."):
+            yield m
diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index 4758b140aa..9a83b9feb2 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -211,7 +211,8 @@ class TestDagProcessorJobRunner:
         parent_pipe.close()
 
     @pytest.mark.backend("mysql", "postgres")
-    def test_start_new_processes_with_same_filepath(self):
+    @mock.patch("airflow.dag_processing.processor.iter_airflow_imports")
+    def test_start_new_processes_with_same_filepath(self, _):
         """
         Test that when a processor already exist with a filepath, a new 
processor won't be created
         with that filepath. The filepath will just be removed from the list.
diff --git a/tests/dags/test_imports.py b/tests/dags/test_imports.py
new file mode 100644
index 0000000000..43be6fc08e
--- /dev/null
+++ b/tests/dags/test_imports.py
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# fmt: off
+
+# this file contains sample code than only needs to pass the lexer
+# it is "badly" formatted on purpose to test edge cases.
+
+from __future__ import annotations
+
+# multiline import
+import  \
+        datetime,   \
+enum,time
+"""
+import airflow.in_comment
+"""
+# from import
+from airflow.utils import file
+# multiline airflow import
+import airflow.decorators, airflow.models\
+, airflow.sensors
+
+if prod:
+    import airflow.if_branch
+else:
+    import airflow.else_branch
+
+def f():
+    # local import
+    import airflow.local_import
+
+# fmt: on
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index e2d429c56f..3595623260 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3113,6 +3113,7 @@ class TestSchedulerJob:
             "test_ignore_this.py",
             "test_invalid_param.py",
             "test_nested_dag.py",
+            "test_imports.py",
             "__init__.py",
         }
         for root, _, files in os.walk(TEST_DAG_FOLDER):
diff --git a/tests/utils/test_file.py b/tests/utils/test_file.py
index e5e51f7b20..448ddf31c7 100644
--- a/tests/utils/test_file.py
+++ b/tests/utils/test_file.py
@@ -188,3 +188,29 @@ class TestListPyFilesPath:
 
         # With safe_mode is False, the user defined callable won't be invoked
         assert file_utils.might_contain_dag(file_path=file_path_with_dag, 
safe_mode=False)
+
+    def test_get_modules(self):
+        file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.py")
+
+        modules = list(file_utils.iter_airflow_imports(file_path))
+
+        assert len(modules) == 4
+        assert "airflow.utils" in modules
+        assert "airflow.decorators" in modules
+        assert "airflow.models" in modules
+        assert "airflow.sensors" in modules
+        # this one is a local import, we don't want it.
+        assert "airflow.local_import" not in modules
+        # this one is in a comment, we don't want it
+        assert "airflow.in_comment" not in modules
+        # we don't want imports under conditions
+        assert "airflow.if_branch" not in modules
+        assert "airflow.else_branch" not in modules
+
+    def test_get_modules_from_invalid_file(self):
+        file_path = os.path.join(TEST_DAGS_FOLDER, "README.md")  # just 
getting a non-python file
+
+        # should not error
+        modules = list(file_utils.iter_airflow_imports(file_path))
+
+        assert len(modules) == 0

Reply via email to