ikholopov-omni commented on code in PR #60127: URL: https://github.com/apache/airflow/pull/60127#discussion_r2662154326
########## airflow-core/src/airflow/dag_processing/importers/base.py: ########## @@ -0,0 +1,163 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Abstract base class for DAG importers.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from airflow.sdk import DAG + + +@dataclass +class DagImportError: + """Structured error information for DAG import failures.""" + + file_path: str + message: str + error_type: str = "import" + line_number: int | None = None + column_number: int | None = None + context: str | None = None + suggestion: str | None = None + stacktrace: str | None = None + + def format_message(self) -> str: + """Format the error as a human-readable string.""" + parts = [f"Error in {self.file_path}"] + if self.line_number is not None: + loc = f"line {self.line_number}" + if self.column_number is not None: + loc += f", column {self.column_number}" + parts.append(f"Location: {loc}") + parts.append(f"Error ({self.error_type}): {self.message}") + if self.context: + parts.append(f"Context:\n{self.context}") + if self.suggestion: + parts.append(f"Suggestion: {self.suggestion}") + return "\n".join(parts) + + +@dataclass +class DagImportWarning: + """Warning information for non-fatal issues during DAG import.""" + + file_path: str + message: str + warning_type: str = "general" + line_number: int | None = None + + +@dataclass +class DagImportResult: + """Result of importing DAGs from a file.""" + + file_path: str + dags: list[DAG] = field(default_factory=list) + errors: list[DagImportError] = field(default_factory=list) + skipped_files: list[str] = field(default_factory=list) + warnings: list[DagImportWarning] = field(default_factory=list) + + @property + def success(self) -> bool: + """Return True if no fatal errors occurred.""" + return len(self.errors) == 0 + + +class AbstractDagImporter(ABC): + """Abstract base class for DAG importers.""" + + @classmethod + @abstractmethod + def supported_extensions(cls) -> list[str]: + """Return file extensions this importer handles (e.g., ['.py', '.zip']).""" + + @abstractmethod + def import_file( + self, + file_path: str | Path, + *, + bundle_path: Path | None = None, + bundle_name: str | None = None, + safe_mode: bool = True, + ) -> DagImportResult: + """Import DAGs from a file.""" + + def can_handle(self, file_path: str | Path) -> bool: + """Check if this importer can handle the given file.""" + path = Path(file_path) if isinstance(file_path, str) else file_path + return path.suffix.lower() in self.supported_extensions() + + def get_relative_path(self, file_path: str | Path, bundle_path: Path | None) -> str: Review Comment: Is the goal of this method is to allow importer to override file_path/bundle_path relation resolution? If not, it can probably be an independent function. ########## airflow-core/src/airflow/dag_processing/importers/base.py: ########## @@ -0,0 +1,163 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Abstract base class for DAG importers.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from airflow.sdk import DAG + + +@dataclass +class DagImportError: + """Structured error information for DAG import failures.""" + + file_path: str + message: str + error_type: str = "import" + line_number: int | None = None + column_number: int | None = None + context: str | None = None + suggestion: str | None = None + stacktrace: str | None = None + + def format_message(self) -> str: + """Format the error as a human-readable string.""" + parts = [f"Error in {self.file_path}"] + if self.line_number is not None: + loc = f"line {self.line_number}" + if self.column_number is not None: + loc += f", column {self.column_number}" + parts.append(f"Location: {loc}") + parts.append(f"Error ({self.error_type}): {self.message}") + if self.context: + parts.append(f"Context:\n{self.context}") + if self.suggestion: + parts.append(f"Suggestion: {self.suggestion}") + return "\n".join(parts) + + +@dataclass +class DagImportWarning: + """Warning information for non-fatal issues during DAG import.""" + + file_path: str + message: str + warning_type: str = "general" + line_number: int | None = None + + +@dataclass +class DagImportResult: + """Result of importing DAGs from a file.""" + + file_path: str + dags: list[DAG] = field(default_factory=list) + errors: list[DagImportError] = field(default_factory=list) + skipped_files: list[str] = field(default_factory=list) + warnings: list[DagImportWarning] = field(default_factory=list) + + @property + def success(self) -> bool: + """Return True if no fatal errors occurred.""" + return len(self.errors) == 0 + + +class AbstractDagImporter(ABC): + """Abstract base class for DAG importers.""" + + @classmethod + @abstractmethod + def supported_extensions(cls) -> list[str]: + """Return file extensions this importer handles (e.g., ['.py', '.zip']).""" + + @abstractmethod + def import_file( + self, + file_path: str | Path, + *, + bundle_path: Path | None = None, + bundle_name: str | None = None, + safe_mode: bool = True, + ) -> DagImportResult: + """Import DAGs from a file.""" + + def can_handle(self, file_path: str | Path) -> bool: + """Check if this importer can handle the given file.""" + path = Path(file_path) if isinstance(file_path, str) else file_path + return path.suffix.lower() in self.supported_extensions() + + def get_relative_path(self, file_path: str | Path, bundle_path: Path | None) -> str: + """Get the relative file path from the bundle root.""" + if bundle_path is None: + return str(file_path) + try: + return str(Path(file_path).relative_to(bundle_path)) + except ValueError: + return str(file_path) + + +class DagImporterRegistry: + """Registry for DAG importers. Singleton that manages importers by file extension.""" + + _instance: DagImporterRegistry | None = None + _importers: dict[str, AbstractDagImporter] + + def __new__(cls) -> DagImporterRegistry: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._importers = {} + cls._instance._register_default_importers() + return cls._instance + + def _register_default_importers(self) -> None: + from airflow.dag_processing.importers.python_importer import PythonDagImporter + + self.register(PythonDagImporter()) + + def register(self, importer: AbstractDagImporter) -> None: + """Register an importer for its supported extensions.""" + for ext in importer.supported_extensions(): + self._importers[ext.lower()] = importer Review Comment: From what I understand, the idea is to use file extensions for automatic deduction of the required importer. While this is viable, things will get messy almost immediately - for ex. `.zip` importer will be claimed by Python importer, but I believe other formats (like YamlImporter) should also be able to handle it. At the very least we need to put some protection here that would prevent the accidental overrides of the extension that already got claimed by another registered importer. Overall, keeping importer independent of other configurations (like Bundles) seems like a reasonable "default", if we want to allow per-bundle customization (like only import Python and ignore YAML files for the bundle A, only import YAML for the bundle B) , we can always add it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
