This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 381320832dc661dfbb8cba8ddb3383b52f05b516
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu May 4 18:18:28 2023 +0100

    Fix issue in pre-importing modules in zipfile (#31061)
    
    This fixes the scheduler crash due to reading a zipped file as source code
    when trying to pre-import airflow modules from the python modules in the 
zip files.
    The fix was to extract the python modules in the zip file and then 
pre-import airflow modules in them
    
    apply suggestions from code review
    
    (cherry picked from commit 34b6230f3c7815b8ae7e99443e45a56921059d3f)
---
 airflow/dag_processing/processor.py    | 53 +++++++++++++++++++++++++---------
 tests/dag_processing/test_processor.py | 17 +++++++++++
 2 files changed, 57 insertions(+), 13 deletions(-)

diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 442431afb5..0684bf60f9 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -23,10 +23,11 @@ import os
 import signal
 import threading
 import time
+import zipfile
 from contextlib import redirect_stderr, redirect_stdout, suppress
 from datetime import datetime, timedelta
 from multiprocessing.connection import Connection as MultiprocessingConnection
-from typing import TYPE_CHECKING, Iterator
+from typing import TYPE_CHECKING, Iterable, Iterator
 
 from setproctitle import setproctitle
 from sqlalchemy import exc, func, or_
@@ -51,7 +52,7 @@ from airflow.models.taskinstance import TaskInstance as TI
 from airflow.stats import Stats
 from airflow.utils import timezone
 from airflow.utils.email import get_email_address_list, send_email
-from airflow.utils.file import iter_airflow_imports
+from airflow.utils.file import iter_airflow_imports, might_contain_dag
 from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, 
set_context
 from airflow.utils.mixins import MultiprocessingStartMethodMixin
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -193,18 +194,23 @@ class DagFileProcessorProcess(LoggingMixin, 
MultiprocessingStartMethodMixin):
             # Read the file to pre-import airflow modules used.
             # This prevents them from being re-imported from zero in each 
"processing" process
             # and saves CPU time and memory.
-            for module in iter_airflow_imports(self.file_path):
+            zip_file_paths = []
+            if zipfile.is_zipfile(self.file_path):
                 try:
-                    importlib.import_module(module)
-                except Exception as e:
-                    # only log as warning because an error here is not 
preventing anything from working, and
-                    # if it's serious, it's going to be surfaced to the user 
when the dag is actually parsed.
-                    self.log.warning(
-                        "Error when trying to pre-import module '%s' found in 
%s: %s",
-                        module,
-                        self.file_path,
-                        e,
-                    )
+                    with zipfile.ZipFile(self.file_path) as z:
+                        zip_file_paths.extend(
+                            [
+                                os.path.join(self.file_path, info.filename)
+                                for info in z.infolist()
+                                if might_contain_dag(info.filename, True, z)
+                            ]
+                        )
+                except zipfile.BadZipFile as err:
+                    self.log.error("There was an err accessing %s, %s", 
self.file_path, err)
+            if zip_file_paths:
+                self.import_modules(zip_file_paths)
+            else:
+                self.import_modules(self.file_path)
 
         context = self._get_multiprocessing_context()
 
@@ -355,6 +361,27 @@ class DagFileProcessorProcess(LoggingMixin, 
MultiprocessingStartMethodMixin):
     def waitable_handle(self):
         return self._process.sentinel
 
+    def import_modules(self, file_path: str | Iterable[str]):
+        def _import_modules(filepath):
+            for module in iter_airflow_imports(filepath):
+                try:
+                    importlib.import_module(module)
+                except Exception as e:
+                    # only log as warning because an error here is not 
preventing anything from working, and
+                    # if it's serious, it's going to be surfaced to the user 
when the dag is actually parsed.
+                    self.log.warning(
+                        "Error when trying to pre-import module '%s' found in 
%s: %s",
+                        module,
+                        file_path,
+                        e,
+                    )
+
+        if isinstance(file_path, str):
+            _import_modules(file_path)
+        elif isinstance(file_path, Iterable):
+            for path in file_path:
+                _import_modules(path)
+
 
 class DagFileProcessor(LoggingMixin):
     """
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index a574dc88a5..b1f4bad11d 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -920,6 +920,23 @@ class TestDagFileProcessor:
         )
         mock_redirect_stdout_for_file.assert_called_once()
 
+    @mock.patch("airflow.dag_processing.processor.settings.dispose_orm", 
MagicMock)
+    @mock.patch.object(DagFileProcessorProcess, "_get_multiprocessing_context")
+    def test_no_valueerror_with_parseable_dag_in_zip(self, mock_context, 
tmpdir):
+        mock_context.return_value.Pipe.return_value = (MagicMock(), 
MagicMock())
+        zip_filename = os.path.join(tmpdir, "test_zip.zip")
+        with ZipFile(zip_filename, "w") as zip_file:
+            zip_file.writestr(TEMP_DAG_FILENAME, PARSEABLE_DAG_FILE_CONTENTS)
+
+        processor = DagFileProcessorProcess(
+            file_path=zip_filename,
+            pickle_dags=False,
+            dag_ids=[],
+            dag_directory=[],
+            callback_requests=[],
+        )
+        processor.start()
+
 
 class TestProcessorAgent:
     @pytest.fixture(autouse=True)

Reply via email to