Copilot commented on code in PR #60127:
URL: https://github.com/apache/airflow/pull/60127#discussion_r2662429101
##########
airflow-core/tests/unit/dag_processing/test_dagbag.py:
##########
@@ -842,41 +831,27 @@ def test_timeout_dag_errors_are_import_errors(self,
tmp_path, caplog):
"""
Test that if the DAG contains Timeout error it will be still loaded to
DB as import_errors
"""
- code_to_save = """
-# Define Dag to load
+ dag_file = tmp_path / "timeout_dag.py"
+ dag_file.write_text("""
import datetime
import time
import airflow
from airflow.providers.standard.operators.python import PythonOperator
-time.sleep(1)
+time.sleep(1) # This will cause timeout during import
Review Comment:
Comment incorrectly states this will cause timeout during import, but the
actual timeout is configured as 0.01 seconds (line 851) while the sleep is 1
second. The comment should reflect that this sleep exceeds the configured
timeout threshold.
```suggestion
time.sleep(1) # This sleep exceeds the configured DAGBAG_IMPORT_TIMEOUT
during import
```
##########
airflow-core/src/airflow/dag_processing/importers/base.py:
##########
@@ -0,0 +1,266 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Abstract base class for DAG importers."""
+
+from __future__ import annotations
+
+import logging
+import os
+import threading
+from abc import ABC, abstractmethod
+from collections.abc import Iterator
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from airflow.sdk import DAG
+
+log = logging.getLogger(__name__)
+
+
+@dataclass
+class DagImportError:
+ """Structured error information for DAG import failures."""
+
+ file_path: str
+ message: str
+ error_type: str = "import"
+ line_number: int | None = None
+ column_number: int | None = None
+ context: str | None = None
+ suggestion: str | None = None
+ stacktrace: str | None = None
+
+ def format_message(self) -> str:
+ """Format the error as a human-readable string."""
+ parts = [f"Error in {self.file_path}"]
+ if self.line_number is not None:
+ loc = f"line {self.line_number}"
+ if self.column_number is not None:
+ loc += f", column {self.column_number}"
+ parts.append(f"Location: {loc}")
+ parts.append(f"Error ({self.error_type}): {self.message}")
+ if self.context:
+ parts.append(f"Context:\n{self.context}")
+ if self.suggestion:
+ parts.append(f"Suggestion: {self.suggestion}")
+ return "\n".join(parts)
+
+
+@dataclass
+class DagImportWarning:
+ """Warning information for non-fatal issues during DAG import."""
+
+ file_path: str
+ message: str
+ warning_type: str = "general"
+ line_number: int | None = None
+
+
+@dataclass
+class DagImportResult:
+ """Result of importing DAGs from a file."""
+
+ file_path: str
+ dags: list[DAG] = field(default_factory=list)
+ errors: list[DagImportError] = field(default_factory=list)
+ skipped_files: list[str] = field(default_factory=list)
+ warnings: list[DagImportWarning] = field(default_factory=list)
+
+ @property
+ def success(self) -> bool:
+ """Return True if no fatal errors occurred."""
+ return len(self.errors) == 0
+
+
+class AbstractDagImporter(ABC):
+ """Abstract base class for DAG importers."""
+
+ @classmethod
+ @abstractmethod
+ def supported_extensions(cls) -> list[str]:
+ """Return file extensions this importer handles (e.g., ['.py',
'.zip'])."""
+
+ @abstractmethod
+ def import_file(
+ self,
+ file_path: str | Path,
+ *,
+ bundle_path: Path | None = None,
+ bundle_name: str | None = None,
+ safe_mode: bool = True,
+ ) -> DagImportResult:
+ """Import DAGs from a file."""
+
+ def can_handle(self, file_path: str | Path) -> bool:
+ """Check if this importer can handle the given file."""
+ path = Path(file_path) if isinstance(file_path, str) else file_path
+ return path.suffix.lower() in self.supported_extensions()
+
+ def get_relative_path(self, file_path: str | Path, bundle_path: Path |
None) -> str:
+ """Get the relative file path from the bundle root."""
+ if bundle_path is None:
+ return str(file_path)
+ try:
+ return str(Path(file_path).relative_to(bundle_path))
+ except ValueError:
+ return str(file_path)
+
+ def list_dag_files(
+ self,
+ directory: str | os.PathLike[str],
+ safe_mode: bool = True,
+ ) -> Iterator[str]:
+ """
+ List DAG files in a directory that this importer can handle.
+
+ Override this method to customize file discovery for your importer.
+ The default implementation finds files matching supported_extensions()
+ and respects .airflowignore files.
+
+ :param directory: Directory to search for DAG files
+ :param safe_mode: Whether to use heuristics to filter non-DAG files
+ :return: Iterator of file paths
+ """
+ from airflow._shared.module_loading.file_discovery import
find_path_from_directory
+ from airflow.configuration import conf
+ from airflow.utils.file import might_contain_dag
+
+ ignore_file_syntax = conf.get_mandatory_value("core",
"DAG_IGNORE_FILE_SYNTAX", fallback="glob")
+ supported_exts = [ext.lower() for ext in self.supported_extensions()]
+
+ for file_path in find_path_from_directory(directory, ".airflowignore",
ignore_file_syntax):
+ path = Path(file_path)
+
+ if not path.is_file():
+ continue
+
+ # Check if this importer handles this file extension
+ if path.suffix.lower() not in supported_exts:
+ continue
+
+ # Apply safe_mode heuristic if enabled
+ if safe_mode and not might_contain_dag(file_path, safe_mode):
+ continue
+
+ yield file_path
+
+
+class DagImporterRegistry:
+ """
+ Registry for DAG importers. Singleton that manages importers by file
extension.
+
+ Each file extension can only be handled by one importer at a time. If
multiple
+ importers claim the same extension, the last registered one wins and a
warning
+ is logged. The built-in PythonDagImporter handles .py and .zip extensions.
+ """
+
+ _instance: DagImporterRegistry | None = None
Review Comment:
Class variable `_instance` should be defined inside `__init__` or have a
proper type annotation at class level. Current placement after the docstring
but before `__new__` is unconventional. Consider moving it to be the first
class variable or initializing it properly in `__new__`.
##########
airflow-core/src/airflow/dag_processing/importers/python_importer.py:
##########
@@ -0,0 +1,376 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Python DAG importer - imports DAGs from Python files."""
+
+from __future__ import annotations
+
+import contextlib
+import importlib
+import importlib.machinery
+import importlib.util
+import logging
+import os
+import signal
+import sys
+import traceback
+import warnings
+import zipfile
+from collections.abc import Iterator
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.dag_processing.importers.base import (
+ AbstractDagImporter,
+ DagImportError,
+ DagImportResult,
+ DagImportWarning,
+)
+from airflow.utils.docs import get_docs_url
+from airflow.utils.file import get_unique_dag_module_name, might_contain_dag
+
+if TYPE_CHECKING:
+ from types import ModuleType
+
+ from airflow.sdk import DAG
+
+log = logging.getLogger(__name__)
+
+
[email protected]
+def _timeout(seconds: float = 1, error_message: str = "Timeout"):
+ """Context manager for timing out operations."""
+ error_message = error_message + ", PID: " + str(os.getpid())
+
+ def handle_timeout(signum, frame):
+ log.error("Process timed out, PID: %s", str(os.getpid()))
+ from airflow.sdk.exceptions import AirflowTaskTimeout
+
+ raise AirflowTaskTimeout(error_message)
+
+ try:
+ try:
+ signal.signal(signal.SIGALRM, handle_timeout)
+ signal.setitimer(signal.ITIMER_REAL, seconds)
+ except ValueError:
+ log.warning("timeout can't be used in the current context",
exc_info=True)
+ yield
+ finally:
+ with contextlib.suppress(ValueError):
+ signal.setitimer(signal.ITIMER_REAL, 0)
+
+
+class PythonDagImporter(AbstractDagImporter):
+ """
+ Importer for Python DAG files and zip archives containing Python DAGs.
+
+ This is the default importer registered with the DagImporterRegistry. It
handles:
+ - .py files: Standard Python DAG files
+ - .zip files: ZIP archives containing Python DAG files
+
+ Note: The .zip extension is exclusively owned by this importer. If you
need to
+ support other file formats inside ZIP archives (e.g., YAML), you would
need to
+ either extend this importer or create a composite importer that delegates
based
+ on the contents of the archive.
+ """
+
+ @classmethod
+ def supported_extensions(cls) -> list[str]:
+ """Return file extensions handled by this importer (.py and .zip)."""
+ return [".py", ".zip"]
+
+ def list_dag_files(
+ self,
+ directory: str | os.PathLike[str],
+ safe_mode: bool = True,
+ ) -> Iterator[str]:
+ """
+ List Python DAG files in a directory.
+
+ Handles both .py files and .zip archives containing Python DAGs.
+ Respects .airflowignore files in the directory tree.
+ """
+ from airflow._shared.module_loading.file_discovery import
find_path_from_directory
+ from airflow.utils.file import might_contain_dag
+
+ ignore_file_syntax = conf.get_mandatory_value("core",
"DAG_IGNORE_FILE_SYNTAX", fallback="glob")
+
+ for file_path in find_path_from_directory(directory, ".airflowignore",
ignore_file_syntax):
+ path = Path(file_path)
+ try:
+ if path.is_file() and (path.suffix.lower() == ".py" or
zipfile.is_zipfile(path)):
+ if might_contain_dag(file_path, safe_mode):
+ yield file_path
+ except Exception:
+ log.exception("Error while examining %s", file_path)
+
+ def import_file(
+ self,
+ file_path: str | Path,
+ *,
+ bundle_path: Path | None = None,
+ bundle_name: str | None = None,
+ safe_mode: bool = True,
+ ) -> DagImportResult:
+ """
+ Import DAGs from a Python file or zip archive.
+
+ :param file_path: Path to the Python file to import.
+ :param bundle_path: Path to the bundle root.
+ :param bundle_name: Name of the bundle.
+ :param safe_mode: If True, skip files that don't appear to contain
DAGs.
+ :return: DagImportResult with imported DAGs and any errors.
+ """
+ from airflow.sdk.definitions._internal.contextmanager import DagContext
+
+ filepath = str(file_path)
+ relative_path = self.get_relative_path(filepath, bundle_path)
+ result = DagImportResult(file_path=relative_path)
+
+ if not os.path.isfile(filepath):
+ result.errors.append(
+ DagImportError(
+ file_path=relative_path,
+ message=f"File not found: {filepath}",
+ error_type="file_not_found",
+ )
+ )
+ return result
+
+ # Clear any autoregistered dags from previous imports
+ DagContext.autoregistered_dags.clear()
+
+ # Capture warnings during import
+ captured_warnings: list[warnings.WarningMessage] = []
Review Comment:
The variable `captured_warnings` is initialized as an empty list but then
immediately reassigned in the context manager. The initial assignment on line
159 is unnecessary and could be confusing. Remove the initialization and rely
on the context manager assignment.
```suggestion
```
##########
airflow-core/src/airflow/dag_processing/dagbag.py:
##########
@@ -403,187 +367,29 @@ def _get_relative_fileloc(self, filepath: str) -> str:
return str(Path(filepath).relative_to(self.bundle_path))
return filepath
- def _load_modules_from_file(self, filepath, safe_mode):
- from airflow.sdk.definitions._internal.contextmanager import DagContext
-
- def handler(signum, frame):
- """Handle SIGSEGV signal and let the user know that the import
failed."""
- msg = f"Received SIGSEGV signal while processing {filepath}."
- self.log.error(msg)
- relative_filepath = self._get_relative_fileloc(filepath)
- self.import_errors[relative_filepath] = msg
-
- try:
- signal.signal(signal.SIGSEGV, handler)
- except ValueError:
- self.log.warning("SIGSEGV signal handler registration failed. Not
in the main thread")
-
- if not might_contain_dag(filepath, safe_mode):
- # Don't want to spam user with skip messages
- if not self.has_logged:
- self.has_logged = True
- self.log.info("File %s assumed to contain no DAGs. Skipping.",
filepath)
- return []
-
- self.log.debug("Importing %s", filepath)
- mod_name = get_unique_dag_module_name(filepath)
-
- if mod_name in sys.modules:
- del sys.modules[mod_name]
-
- DagContext.current_autoregister_module_name = mod_name
-
- def parse(mod_name, filepath):
- try:
- loader = importlib.machinery.SourceFileLoader(mod_name,
filepath)
- spec = importlib.util.spec_from_loader(mod_name, loader)
- new_module = importlib.util.module_from_spec(spec)
- sys.modules[spec.name] = new_module
- loader.exec_module(new_module)
- return [new_module]
- except KeyboardInterrupt:
- # re-raise ctrl-c
- raise
- except BaseException as e:
- # Normally you shouldn't catch BaseException, but in this case
we want to, as, pytest.skip
- # raises an exception which does not inherit from Exception,
and we want to catch that here.
- # This would also catch `exit()` in a dag file
- DagContext.autoregistered_dags.clear()
- self.log.exception("Failed to import: %s", filepath)
- relative_filepath = self._get_relative_fileloc(filepath)
- if self.dagbag_import_error_tracebacks:
- self.import_errors[relative_filepath] =
traceback.format_exc(
- limit=-self.dagbag_import_error_traceback_depth
- )
- else:
- self.import_errors[relative_filepath] = str(e)
- return []
-
- dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
-
- if not isinstance(dagbag_import_timeout, (int, float)):
- raise TypeError(
- f"Value ({dagbag_import_timeout}) from
get_dagbag_import_timeout must be int or float"
- )
-
- if dagbag_import_timeout <= 0: # no parsing timeout
- return parse(mod_name, filepath)
-
- timeout_msg = (
- f"DagBag import timeout for {filepath} after
{dagbag_import_timeout}s.\n"
- "Please take a look at these docs to improve your DAG import
time:\n"
- f"* {get_docs_url('best-practices.html#top-level-python-code')}\n"
- f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}"
- )
- with timeout(dagbag_import_timeout, error_message=timeout_msg):
- return parse(mod_name, filepath)
-
- def _load_modules_from_zip(self, filepath, safe_mode):
- from airflow.sdk.definitions._internal.contextmanager import DagContext
-
- mods = []
- with zipfile.ZipFile(filepath) as current_zip_file:
- for zip_info in current_zip_file.infolist():
- zip_path = Path(zip_info.filename)
- if zip_path.suffix not in [".py", ".pyc"] or
len(zip_path.parts) > 1:
- continue
-
- if zip_path.stem == "__init__":
- self.log.warning("Found %s at root of %s", zip_path.name,
filepath)
-
- self.log.debug("Reading %s from %s", zip_info.filename,
filepath)
-
- if not might_contain_dag(zip_info.filename, safe_mode,
current_zip_file):
- # todo: create ignore list
- # Don't want to spam user with skip messages
- if not self.has_logged:
- self.has_logged = True
- self.log.info(
- "File %s:%s assumed to contain no DAGs.
Skipping.", filepath, zip_info.filename
- )
- continue
-
- mod_name = zip_path.stem
- if mod_name in sys.modules:
- del sys.modules[mod_name]
-
- DagContext.current_autoregister_module_name = mod_name
- try:
- sys.path.insert(0, filepath)
- current_module = importlib.import_module(mod_name)
- mods.append(current_module)
- except Exception as e:
- DagContext.autoregistered_dags.clear()
- fileloc = os.path.join(filepath, zip_info.filename)
- self.log.exception("Failed to import: %s", fileloc)
- relative_fileloc = self._get_relative_fileloc(fileloc)
- if self.dagbag_import_error_tracebacks:
- self.import_errors[relative_fileloc] =
traceback.format_exc(
- limit=-self.dagbag_import_error_traceback_depth
- )
- else:
- self.import_errors[relative_fileloc] = str(e)
- finally:
- if sys.path[0] == filepath:
- del sys.path[0]
- return mods
-
- def _process_modules(self, filepath, mods, file_last_changed_on_disk):
- from airflow.sdk import DAG
- from airflow.sdk.definitions._internal.contextmanager import DagContext
-
- top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if
isinstance(o, DAG)}
-
- top_level_dags.update(DagContext.autoregistered_dags)
-
- DagContext.current_autoregister_module_name = None
- DagContext.autoregistered_dags.clear()
-
- found_dags = []
-
- for dag, mod in top_level_dags:
- dag.fileloc = mod.__file__
- relative_fileloc = self._get_relative_fileloc(dag.fileloc)
- dag.relative_fileloc = relative_fileloc
- try:
- dag.validate()
- _validate_executor_fields(dag, self.bundle_name)
- self.bag_dag(dag=dag)
- except AirflowClusterPolicySkipDag:
- pass
- except Exception as e:
- self.log.exception("Failed to bag_dag: %s", dag.fileloc)
- self.import_errors[relative_fileloc] = f"{type(e).__name__}:
{e}"
- self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
- else:
- found_dags.append(dag)
- return found_dags
-
def bag_dag(self, dag: DAG):
"""
Add the DAG into the bag.
:raises: AirflowDagCycleException if a cycle is detected.
:raises: AirflowDagDuplicatedIdException if this dag already exists in
the bag.
"""
- dag.check_cycle() # throws exception if a task cycle is found
-
+ dag.validate()
+ dag.check_cycle()
dag.resolve_template_files()
dag.last_loaded = timezone.utcnow()
+ _validate_executor_fields(dag, self.bundle_name)
+
try:
- # Check policies
settings.dag_policy(dag)
for task in dag.tasks:
- # The listeners are not supported when ending a task via a
trigger on asynchronous operators.
if getattr(task, "end_from_trigger", False) and
get_listener_manager().has_listeners:
- raise AirflowException(
- "Listeners are not supported with
end_from_trigger=True for deferrable operators. "
- "Task %s in DAG %s has end_from_trigger=True with
listeners from plugins. "
- "Set end_from_trigger=False to use listeners.",
- task.task_id,
- dag.dag_id,
+ raise AirflowClusterPolicyViolation(
+ f"Listeners are not supported with
end_from_trigger=True for deferrable operators. "
+ f"Task {task.task_id} in DAG {dag.dag_id} has
end_from_trigger=True with listeners from plugins. "
+ "Set end_from_trigger=False to use listeners."
Review Comment:
Changed exception type from `AirflowException` to
`AirflowClusterPolicyViolation`, but this error is not related to cluster
policy - it's about an incompatible feature combination (listeners +
end_from_trigger). This should remain as `AirflowException` or use a more
appropriate exception type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]