kaxil commented on code in PR #60127:
URL: https://github.com/apache/airflow/pull/60127#discussion_r2662421646


##########
airflow-core/src/airflow/dag_processing/importers/base.py:
##########
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Abstract base class for DAG importers."""
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from airflow.sdk import DAG
+
+
+@dataclass
+class DagImportError:
+    """Structured error information for DAG import failures."""
+
+    file_path: str
+    message: str
+    error_type: str = "import"
+    line_number: int | None = None
+    column_number: int | None = None
+    context: str | None = None
+    suggestion: str | None = None
+    stacktrace: str | None = None
+
+    def format_message(self) -> str:
+        """Format the error as a human-readable string."""
+        parts = [f"Error in {self.file_path}"]
+        if self.line_number is not None:
+            loc = f"line {self.line_number}"
+            if self.column_number is not None:
+                loc += f", column {self.column_number}"
+            parts.append(f"Location: {loc}")
+        parts.append(f"Error ({self.error_type}): {self.message}")
+        if self.context:
+            parts.append(f"Context:\n{self.context}")
+        if self.suggestion:
+            parts.append(f"Suggestion: {self.suggestion}")
+        return "\n".join(parts)
+
+
+@dataclass
+class DagImportWarning:
+    """Warning information for non-fatal issues during DAG import."""
+
+    file_path: str
+    message: str
+    warning_type: str = "general"
+    line_number: int | None = None
+
+
+@dataclass
+class DagImportResult:
+    """Result of importing DAGs from a file."""
+
+    file_path: str
+    dags: list[DAG] = field(default_factory=list)
+    errors: list[DagImportError] = field(default_factory=list)
+    skipped_files: list[str] = field(default_factory=list)
+    warnings: list[DagImportWarning] = field(default_factory=list)
+
+    @property
+    def success(self) -> bool:
+        """Return True if no fatal errors occurred."""
+        return len(self.errors) == 0
+
+
+class AbstractDagImporter(ABC):
+    """Abstract base class for DAG importers."""
+
+    @classmethod
+    @abstractmethod
+    def supported_extensions(cls) -> list[str]:
+        """Return file extensions this importer handles (e.g., ['.py', 
'.zip'])."""
+
+    @abstractmethod
+    def import_file(
+        self,
+        file_path: str | Path,
+        *,
+        bundle_path: Path | None = None,
+        bundle_name: str | None = None,
+        safe_mode: bool = True,
+    ) -> DagImportResult:
+        """Import DAGs from a file."""
+
+    def can_handle(self, file_path: str | Path) -> bool:
+        """Check if this importer can handle the given file."""
+        path = Path(file_path) if isinstance(file_path, str) else file_path
+        return path.suffix.lower() in self.supported_extensions()
+
+    def get_relative_path(self, file_path: str | Path, bundle_path: Path | 
None) -> str:
+        """Get the relative file path from the bundle root."""
+        if bundle_path is None:
+            return str(file_path)
+        try:
+            return str(Path(file_path).relative_to(bundle_path))
+        except ValueError:
+            return str(file_path)
+
+
+class DagImporterRegistry:
+    """Registry for DAG importers. Singleton that manages importers by file 
extension."""
+
+    _instance: DagImporterRegistry | None = None
+    _importers: dict[str, AbstractDagImporter]
+
+    def __new__(cls) -> DagImporterRegistry:
+        if cls._instance is None:
+            cls._instance = super().__new__(cls)
+            cls._instance._importers = {}
+            cls._instance._register_default_importers()
+        return cls._instance
+
+    def _register_default_importers(self) -> None:
+        from airflow.dag_processing.importers.python_importer import 
PythonDagImporter
+
+        self.register(PythonDagImporter())
+
+    def register(self, importer: AbstractDagImporter) -> None:
+        """Register an importer for its supported extensions."""
+        for ext in importer.supported_extensions():
+            self._importers[ext.lower()] = importer

Review Comment:
   Just added warning on override: When registering an importer for an 
extension that's already claimed, we now log a warning: "Extension '.zip' 
already registered by PythonDagImporter, overriding with YamlImporter"
   
   and documented the limitation in `PythonDagImporter`'s docstring to explain 
that `.zip` is exclusively owned by it, and notes that handling other formats 
inside ZIP would require extending this importer or creating a composite 
importer. Although I do not want to advertise this as a feature until AIP-85 is 
done anyway :) 
   
   For now, I think the single-importer-per-extension model is simpler and 
sufficient. If we need multi-importer support for ZIP archives in the future, 
we could add a delegation mechanism where `PythonDagImporter` inspects the ZIP 
contents and delegates to other importers. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to