Copilot commented on code in PR #60127:
URL: https://github.com/apache/airflow/pull/60127#discussion_r2661971525


##########
airflow-core/src/airflow/dag_processing/dagbag.py:
##########
@@ -339,29 +331,62 @@ def process_file(self, filepath, only_if_updated=True, 
safe_mode=True):
             self.log.exception(e)
             return []
 
-        # Ensure we don't pick up anything else we didn't mean to
-        DagContext.autoregistered_dags.clear()
-
         self.captured_warnings.pop(filepath, None)
-        with _capture_with_reraise() as captured_warnings:
-            if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
-                mods = self._load_modules_from_file(filepath, safe_mode)
-            else:
-                mods = self._load_modules_from_zip(filepath, safe_mode)
 
-        if captured_warnings:
-            formatted_warnings = []
-            for msg in captured_warnings:
-                category = msg.category.__name__
-                if (module := msg.category.__module__) != "builtins":
-                    category = f"{module}.{category}"
-                formatted_warnings.append(f"{msg.filename}:{msg.lineno}: 
{category}: {msg.message}")
+        from airflow.dag_processing.importers import get_importer_registry
+
+        registry = get_importer_registry()
+        importer = registry.get_importer(filepath)
+
+        if importer is None:
+            self.log.debug("No importer found for file: %s", filepath)
+            return []
+
+        result = importer.import_file(
+            file_path=filepath,
+            bundle_path=Path(self.dag_folder) if self.dag_folder else None,
+            bundle_name=self.bundle_name,
+            safe_mode=safe_mode,
+        )
+
+        if result.skipped_files:
+            for skipped in result.skipped_files:
+                if not self.has_logged:
+                    self.has_logged = True
+                    self.log.info("File %s assumed to contain no DAGs. 
Skipping.", skipped)
+
+        if result.errors:
+            for error in result.errors:
+                # Use the file path from error for ZIP files (contains 
zip/file.py format)
+                # For regular files, use the original filepath
+                if zipfile.is_zipfile(filepath):
+                    error_path = error.file_path if error.file_path else 
filepath
+                else:
+                    error_path = filepath
+                error_msg = error.stacktrace if error.stacktrace else 
error.message
+                self.import_errors[error_path] = error_msg
+                self.log.error("Error loading DAG from %s: %s", error_path, 
error.message)
+
+        if result.warnings:
+            formatted_warnings = [
+                f"{w.file_path}:{w.line_number}: {w.warning_type}: 
{w.message}" for w in result.warnings
+            ]
             self.captured_warnings[filepath] = tuple(formatted_warnings)
 
-        found_dags = self._process_modules(filepath, mods, 
file_last_changed_on_disk)
+        bagged_dags = []
+        for dag in result.dags:
+            try:
+                # Only set fileloc if not already set by importer (ZIP files 
have path inside archive)
+                if not dag.fileloc:

Review Comment:
   The comment on line 379 states 'Only set fileloc if not already set by 
importer (ZIP files have path inside archive)', but this logic may not 
correctly handle all cases. If a DAG's fileloc is explicitly set to an empty 
string or falsy value by the importer, this condition would incorrectly 
override it. Consider checking for None specifically or clarifying the expected 
behavior.
   ```suggestion
                   if dag.fileloc is None:
   ```



##########
airflow-core/src/airflow/dag_processing/importers/python_importer.py:
##########
@@ -0,0 +1,430 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Python DAG importer - imports DAGs from Python files."""
+
+from __future__ import annotations
+
+import contextlib
+import importlib
+import importlib.machinery
+import importlib.util
+import logging
+import os
+import signal
+import sys
+import traceback
+import warnings
+import zipfile
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+from airflow import settings
+from airflow.configuration import conf
+from airflow.dag_processing.importers.base import (
+    AbstractDagImporter,
+    DagImportError,
+    DagImportResult,
+    DagImportWarning,
+)
+from airflow.exceptions import (
+    AirflowClusterPolicyError,
+    AirflowClusterPolicySkipDag,
+    AirflowClusterPolicyViolation,
+    AirflowDagDuplicatedIdException,
+    UnknownExecutorException,
+)
+from airflow.executors.executor_loader import ExecutorLoader
+from airflow.listeners.listener import get_listener_manager
+from airflow.utils.docs import get_docs_url
+from airflow.utils.file import get_unique_dag_module_name, might_contain_dag
+
+if TYPE_CHECKING:
+    from types import ModuleType
+
+    from airflow.sdk import DAG
+
+log = logging.getLogger(__name__)
+
+
[email protected]
+def _timeout(seconds: float = 1, error_message: str = "Timeout"):
+    """Context manager for timing out operations."""
+    error_message = error_message + ", PID: " + str(os.getpid())
+
+    def handle_timeout(signum, frame):
+        log.error("Process timed out, PID: %s", str(os.getpid()))
+        from airflow.sdk.exceptions import AirflowTaskTimeout
+
+        raise AirflowTaskTimeout(error_message)
+
+    try:
+        try:
+            signal.signal(signal.SIGALRM, handle_timeout)
+            signal.setitimer(signal.ITIMER_REAL, seconds)
+        except ValueError:
+            log.warning("timeout can't be used in the current context", 
exc_info=True)
+        yield
+    finally:
+        with contextlib.suppress(ValueError):
+            signal.setitimer(signal.ITIMER_REAL, 0)
+
+
+def _executor_exists(executor_name: str, team_name: str | None) -> bool:
+    """Check if executor exists, with global fallback for teams."""
+    try:
+        ExecutorLoader.lookup_executor_name_by_str(executor_name, 
team_name=team_name)
+        return True
+    except UnknownExecutorException:
+        if team_name:
+            try:
+                ExecutorLoader.lookup_executor_name_by_str(executor_name, 
team_name=None)
+                return True
+            except UnknownExecutorException:
+                pass
+    return False
+
+
+def _validate_executor_fields(dag: DAG, bundle_name: str | None = None) -> 
None:
+    """Validate that executors specified in tasks are available."""
+    dag_team_name = None
+
+    if conf.getboolean("core", "multi_team"):
+        if bundle_name:
+            from airflow.dag_processing.bundles.manager import 
DagBundlesManager
+
+            bundle_manager = DagBundlesManager()
+            bundle_config = bundle_manager._bundle_config[bundle_name]
+            dag_team_name = bundle_config.team_name
+
+    for task in dag.tasks:
+        if not task.executor:
+            continue
+
+        if not _executor_exists(task.executor, dag_team_name):
+            if dag_team_name:
+                raise UnknownExecutorException(
+                    f"Task '{task.task_id}' specifies executor 
'{task.executor}', which is not available "
+                    f"for team '{dag_team_name}' (the team associated with DAG 
'{dag.dag_id}') or as a global executor."
+                )
+            raise UnknownExecutorException(
+                f"Task '{task.task_id}' specifies executor '{task.executor}', 
which is not available."
+            )
+
+
+class PythonDagImporter(AbstractDagImporter):
+    """Importer for Python DAG files and zip archives."""
+
+    @classmethod
+    def supported_extensions(cls) -> list[str]:
+        return [".py", ".zip", ""]  # Empty string for extension-less files
+

Review Comment:
   The comment states 'Empty string for extension-less files', but returning an 
empty string as a file extension is unconventional and may lead to unexpected 
behavior. Empty strings typically indicate missing data rather than 'no 
extension'. Consider clarifying the intended handling of extension-less files 
or documenting why this approach is necessary.
   ```suggestion
           """
           Return the file extensions supported by this importer.
   
           The empty string ("") is used as an explicit sentinel value to 
indicate
           files with no extension (e.g. a filename like "dagfile" without a
           trailing dot). Callers of this method should treat "" as "no 
extension",
           not as "missing data".
           """
           return [".py", ".zip", ""]  # "" is an explicit sentinel for 
extension-less files
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to