This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch py-client-sync in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 74e32aa8d9f7b2febb864399489f858f0f6d6be9 Author: GPK <[email protected]> AuthorDate: Tue Mar 24 16:23:56 2026 +0000 Fix zip DAG import errors being cleared during bundle refresh (#63617) * Fix zip DAG import errors being cleared during bundle refresh * Fixup tests * Resolve comments * Update types * Resolve comments --- airflow-core/src/airflow/dag_processing/manager.py | 35 +++--- airflow-core/src/airflow/models/dag.py | 2 +- .../tests/unit/dag_processing/test_manager.py | 124 ++++++++++++++++++++- airflow-core/tests/unit/models/test_dag.py | 2 +- 4 files changed, 142 insertions(+), 21 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index 10b29c0560e..7d4e20adfa8 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -690,10 +690,11 @@ class DagFileProcessorManager(LoggingMixin): known_files[bundle.name] = found_files - self.deactivate_deleted_dags(bundle_name=bundle.name, present=found_files) + observed_filelocs = self._get_observed_filelocs(found_files) + self.deactivate_deleted_dags(bundle_name=bundle.name, observed_filelocs=observed_filelocs) self.clear_orphaned_import_errors( bundle_name=bundle.name, - observed_filelocs={str(x.rel_path) for x in found_files}, # todo: make relative + observed_filelocs=observed_filelocs, ) if any_refreshed: @@ -710,17 +711,17 @@ class DagFileProcessorManager(LoggingMixin): return rel_paths - def deactivate_deleted_dags(self, bundle_name: str, present: set[DagFileInfo]) -> None: - """Deactivate DAGs that come from files that are no longer present in bundle.""" - - def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]: - """ - Find dag files in zip file located at abs_path. + def _get_observed_filelocs(self, present: set[DagFileInfo]) -> set[str]: + """ + Return observed DAG source paths for bundle entries. - We return the abs "paths" formed by joining the relative path inside the zip - with the path to the zip. + For regular files this includes the relative file path. + For ZIP archives this includes DAG-like inner paths such as + ``archive.zip/dag.py``. + """ - """ + def find_zipped_dags(abs_path: os.PathLike) -> Iterator[str]: + """Yield absolute paths for DAG-like files inside a ZIP archive.""" try: with zipfile.ZipFile(abs_path) as z: for info in z.infolist(): @@ -729,22 +730,26 @@ class DagFileProcessorManager(LoggingMixin): except zipfile.BadZipFile: self.log.exception("There was an error accessing ZIP file %s", abs_path) - rel_filelocs: list[str] = [] + observed_filelocs: set[str] = set() for info in present: abs_path = str(info.absolute_path) if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path): - rel_filelocs.append(str(info.rel_path)) + observed_filelocs.add(str(info.rel_path)) else: if TYPE_CHECKING: assert info.bundle_path for abs_sub_path in find_zipped_dags(abs_path=info.absolute_path): rel_sub_path = Path(abs_sub_path).relative_to(info.bundle_path) - rel_filelocs.append(str(rel_sub_path)) + observed_filelocs.add(str(rel_sub_path)) + return observed_filelocs + + def deactivate_deleted_dags(self, bundle_name: str, observed_filelocs: set[str]) -> None: + """Deactivate DAGs that come from files that are no longer present in bundle.""" with create_session() as session: any_deactivated = DagModel.deactivate_deleted_dags( bundle_name=bundle_name, - rel_filelocs=rel_filelocs, + rel_filelocs=observed_filelocs, session=session, ) # Only run cleanup if we actually deactivated any DAGs diff --git a/airflow-core/src/airflow/models/dag.py b/airflow-core/src/airflow/models/dag.py index 677fbc26048..d4a74b6cc24 100644 --- a/airflow-core/src/airflow/models/dag.py +++ b/airflow-core/src/airflow/models/dag.py @@ -588,7 +588,7 @@ class DagModel(Base): def deactivate_deleted_dags( cls, bundle_name: str, - rel_filelocs: list[str], + rel_filelocs: set[str], session: Session = NEW_SESSION, ) -> bool: """ diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 40a8c4f0b66..e728ce85524 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -26,6 +26,7 @@ import shutil import signal import textwrap import time +import zipfile from collections import defaultdict, deque from datetime import datetime, timedelta from pathlib import Path @@ -110,6 +111,39 @@ def encode_mtime_in_filename(val): return out +def _create_zip_bundle_with_valid_and_broken_dags(zip_path: Path) -> None: + with zipfile.ZipFile(zip_path, "w") as zf: + zf.writestr( + "valid_dag.py", + textwrap.dedent( + """ + from datetime import datetime + + from airflow.providers.standard.operators.empty import EmptyOperator + from airflow.sdk import DAG + + with DAG( + dag_id="zip_valid_dag", + start_date=datetime(2024, 1, 1), + schedule=None, + catchup=False, + ): + EmptyOperator(task_id="task") + """ + ), + ) + zf.writestr( + "broken_dag.py", + textwrap.dedent( + """ + from airflow.sdk import DAG + + raise RuntimeError("broken zip dag") + """ + ), + ) + + class TestDagFileProcessorManager: @pytest.fixture(autouse=True) def _disable_examples(self): @@ -191,6 +225,88 @@ class TestDagFileProcessorManager: assert len(import_errors) == 0 session.rollback() + @pytest.mark.usefixtures("clear_parse_import_errors") + def test_clear_orphaned_import_errors_keeps_zip_inner_file_errors(self, session, tmp_path): + zip_path = tmp_path / "test_zip.zip" + _create_zip_bundle_with_valid_and_broken_dags(zip_path) + + session.add( + ParseImportError( + filename="test_zip.zip/broken_dag.py", + bundle_name="testing", + timestamp=timezone.utcnow(), + stacktrace="zip import error", + ) + ) + session.flush() + + manager = DagFileProcessorManager(max_runs=1) + manager.clear_orphaned_import_errors( + bundle_name="testing", + observed_filelocs=manager._get_observed_filelocs( + { + DagFileInfo( + bundle_name="testing", + rel_path=Path("test_zip.zip"), + bundle_path=tmp_path, + ) + } + ), + session=session, + ) + session.flush() + + import_errors = session.scalars(select(ParseImportError)).all() + assert len(import_errors) == 1 + assert import_errors[0].filename == "test_zip.zip/broken_dag.py" + + def test_get_observed_filelocs_expands_zip_inner_paths(self, tmp_path): + zip_path = tmp_path / "test_zip.zip" + _create_zip_bundle_with_valid_and_broken_dags(zip_path) + + manager = DagFileProcessorManager(max_runs=1) + observed_filelocs = manager._get_observed_filelocs( + { + DagFileInfo( + bundle_name="testing", + rel_path=Path("test_zip.zip"), + bundle_path=tmp_path, + ) + } + ) + + assert observed_filelocs == { + "test_zip.zip/valid_dag.py", + "test_zip.zip/broken_dag.py", + } + + @pytest.mark.usefixtures("clear_parse_import_errors") + def test_refresh_dag_bundles_keeps_zip_inner_file_errors(self, session, tmp_path, configure_dag_bundles): + bundle_path = tmp_path / "bundleone" + bundle_path.mkdir() + zip_path = bundle_path / "test_zip.zip" + _create_zip_bundle_with_valid_and_broken_dags(zip_path) + + session.add( + ParseImportError( + filename="test_zip.zip/broken_dag.py", + bundle_name="bundleone", + timestamp=timezone.utcnow(), + stacktrace="zip import error", + ) + ) + session.flush() + + with configure_dag_bundles({"bundleone": bundle_path}): + DagBundlesManager().sync_bundles_to_db() + manager = DagFileProcessorManager(max_runs=1) + manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) + manager._refresh_dag_bundles({}) + + import_errors = session.scalars(select(ParseImportError)).all() + assert len(import_errors) == 1 + assert import_errors[0].filename == "test_zip.zip/broken_dag.py" + @conf_vars({("core", "load_examples"): "False"}) def test_max_runs_when_no_files(self, tmp_path): with conf_vars({("core", "dags_folder"): str(tmp_path)}): @@ -881,7 +997,7 @@ class TestDagFileProcessorManager: ] manager = DagFileProcessorManager(max_runs=1) - manager.deactivate_deleted_dags("dag_maker", active_files) + manager.deactivate_deleted_dags("dag_maker", manager._get_observed_filelocs(set(active_files))) # The DAG from test_dag1.py is still active assert session.get(DagModel, "test_dag1").is_stale is False @@ -892,14 +1008,14 @@ class TestDagFileProcessorManager: ("rel_filelocs", "expected_return", "expected_dag1_stale", "expected_dag2_stale"), [ pytest.param( - ["test_dag1.py"], # Only dag1 present, dag2 deleted + {"test_dag1.py"}, # Only dag1 present, dag2 deleted True, # Should return True False, # dag1 should not be stale True, # dag2 should be stale id="dags_deactivated", ), pytest.param( - ["test_dag1.py", "test_dag2.py"], # Both files present + {"test_dag1.py", "test_dag2.py"}, # Both files present False, # Should return False False, # dag1 should not be stale False, # dag2 should not be stale @@ -972,7 +1088,7 @@ class TestDagFileProcessorManager: dag_maker.sync_dagbag_to_db() manager = DagFileProcessorManager(max_runs=1) - manager.deactivate_deleted_dags("dag_maker", active_files) + manager.deactivate_deleted_dags("dag_maker", manager._get_observed_filelocs(set(active_files))) if should_call_cleanup: mock_remove_references.assert_called_once() diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index 046c85ea799..11a1c4818fc 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -842,7 +842,7 @@ class TestDag: DagModel.deactivate_deleted_dags( bundle_name=orm_dag.bundle_name, - rel_filelocs=list_py_file_paths(settings.DAGS_FOLDER), + rel_filelocs=set(list_py_file_paths(settings.DAGS_FOLDER)), ) orm_dag = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id))
