This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 381320832dc661dfbb8cba8ddb3383b52f05b516 Author: Ephraim Anierobi <[email protected]> AuthorDate: Thu May 4 18:18:28 2023 +0100 Fix issue in pre-importing modules in zipfile (#31061) This fixes the scheduler crash due to reading a zipped file as source code when trying to pre-import airflow modules from the python modules in the zip files. The fix was to extract the python modules in the zip file and then pre-import airflow modules in them apply suggestions from code review (cherry picked from commit 34b6230f3c7815b8ae7e99443e45a56921059d3f) --- airflow/dag_processing/processor.py | 53 +++++++++++++++++++++++++--------- tests/dag_processing/test_processor.py | 17 +++++++++++ 2 files changed, 57 insertions(+), 13 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 442431afb5..0684bf60f9 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -23,10 +23,11 @@ import os import signal import threading import time +import zipfile from contextlib import redirect_stderr, redirect_stdout, suppress from datetime import datetime, timedelta from multiprocessing.connection import Connection as MultiprocessingConnection -from typing import TYPE_CHECKING, Iterator +from typing import TYPE_CHECKING, Iterable, Iterator from setproctitle import setproctitle from sqlalchemy import exc, func, or_ @@ -51,7 +52,7 @@ from airflow.models.taskinstance import TaskInstance as TI from airflow.stats import Stats from airflow.utils import timezone from airflow.utils.email import get_email_address_list, send_email -from airflow.utils.file import iter_airflow_imports +from airflow.utils.file import iter_airflow_imports, might_contain_dag from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context from airflow.utils.mixins import MultiprocessingStartMethodMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -193,18 +194,23 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin): # Read the file to pre-import airflow modules used. # This prevents them from being re-imported from zero in each "processing" process # and saves CPU time and memory. - for module in iter_airflow_imports(self.file_path): + zip_file_paths = [] + if zipfile.is_zipfile(self.file_path): try: - importlib.import_module(module) - except Exception as e: - # only log as warning because an error here is not preventing anything from working, and - # if it's serious, it's going to be surfaced to the user when the dag is actually parsed. - self.log.warning( - "Error when trying to pre-import module '%s' found in %s: %s", - module, - self.file_path, - e, - ) + with zipfile.ZipFile(self.file_path) as z: + zip_file_paths.extend( + [ + os.path.join(self.file_path, info.filename) + for info in z.infolist() + if might_contain_dag(info.filename, True, z) + ] + ) + except zipfile.BadZipFile as err: + self.log.error("There was an err accessing %s, %s", self.file_path, err) + if zip_file_paths: + self.import_modules(zip_file_paths) + else: + self.import_modules(self.file_path) context = self._get_multiprocessing_context() @@ -355,6 +361,27 @@ class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin): def waitable_handle(self): return self._process.sentinel + def import_modules(self, file_path: str | Iterable[str]): + def _import_modules(filepath): + for module in iter_airflow_imports(filepath): + try: + importlib.import_module(module) + except Exception as e: + # only log as warning because an error here is not preventing anything from working, and + # if it's serious, it's going to be surfaced to the user when the dag is actually parsed. + self.log.warning( + "Error when trying to pre-import module '%s' found in %s: %s", + module, + file_path, + e, + ) + + if isinstance(file_path, str): + _import_modules(file_path) + elif isinstance(file_path, Iterable): + for path in file_path: + _import_modules(path) + class DagFileProcessor(LoggingMixin): """ diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index a574dc88a5..b1f4bad11d 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -920,6 +920,23 @@ class TestDagFileProcessor: ) mock_redirect_stdout_for_file.assert_called_once() + @mock.patch("airflow.dag_processing.processor.settings.dispose_orm", MagicMock) + @mock.patch.object(DagFileProcessorProcess, "_get_multiprocessing_context") + def test_no_valueerror_with_parseable_dag_in_zip(self, mock_context, tmpdir): + mock_context.return_value.Pipe.return_value = (MagicMock(), MagicMock()) + zip_filename = os.path.join(tmpdir, "test_zip.zip") + with ZipFile(zip_filename, "w") as zip_file: + zip_file.writestr(TEMP_DAG_FILENAME, PARSEABLE_DAG_FILE_CONTENTS) + + processor = DagFileProcessorProcess( + file_path=zip_filename, + pickle_dags=False, + dag_ids=[], + dag_directory=[], + callback_requests=[], + ) + processor.start() + class TestProcessorAgent: @pytest.fixture(autouse=True)
