This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 35422f087ae Change leftover import error path record to relative path 
(#60967)
35422f087ae is described below

commit 35422f087aeffb65aa0044ed4a21ee0bb11680a1
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Jan 26 18:32:53 2026 +0100

    Change leftover import error path record to relative path (#60967)
    
    * Change leftover import error path record to relative path
    
    These too should use relative paths for import error
    
    * fixup! Change leftover import error path record to relative path
    
    * fixup! fixup! Change leftover import error path record to relative path
    
    * Apply suggestion from @ephraimbuddy
---
 airflow-core/src/airflow/dag_processing/dagbag.py  | 13 ++---
 .../src/airflow/dag_processing/processor.py        |  4 +-
 .../tests/unit/dag_processing/test_dagbag.py       | 64 ++++++++++++++++++++++
 3 files changed, 72 insertions(+), 9 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py 
b/airflow-core/src/airflow/dag_processing/dagbag.py
index 5c21254cf6e..649be758477 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -22,7 +22,6 @@ import os
 import sys
 import textwrap
 import warnings
-import zipfile
 from collections.abc import Generator
 from datetime import datetime, timedelta
 from pathlib import Path
@@ -318,12 +317,9 @@ class DagBag(LoggingMixin):
 
         if result.errors:
             for error in result.errors:
-                # Use the file path from error for ZIP files (contains 
zip/file.py format)
-                # For regular files, use the original filepath
-                if zipfile.is_zipfile(filepath):
-                    error_path = error.file_path if error.file_path else 
filepath
-                else:
-                    error_path = filepath
+                # Use the relative file path from error (importer provides 
relative paths)
+                # Fall back to converting filepath to relative if 
error.file_path is not set
+                error_path = error.file_path if error.file_path else 
self._get_relative_fileloc(filepath)
                 error_msg = error.stacktrace if error.stacktrace else 
error.message
                 self.import_errors[error_path] = error_msg
                 self.log.error("Error loading DAG from %s: %s", error_path, 
error.message)
@@ -356,7 +352,8 @@ class DagBag(LoggingMixin):
                 self.log.debug("DAG %s skipped by cluster policy", dag.dag_id)
             except Exception as e:
                 self.log.exception("Error bagging DAG from %s", filepath)
-                self.import_errors[filepath] = f"{type(e).__name__}: {e}"
+                relative_path = self._get_relative_fileloc(filepath)
+                self.import_errors[relative_path] = f"{type(e).__name__}: {e}"
 
         self.file_last_changed[filepath] = file_last_changed_on_disk
         return bagged_dags
diff --git a/airflow-core/src/airflow/dag_processing/processor.py 
b/airflow-core/src/airflow/dag_processing/processor.py
index 897a07130bb..46fdfecf470 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -255,7 +255,9 @@ def _serialize_dags(
             dagbag_import_error_traceback_depth = conf.getint(
                 "core", "dagbag_import_error_traceback_depth", fallback=None
             )
-            serialization_import_errors[dag.fileloc] = traceback.format_exc(
+            # Use relative_fileloc if available, fall back to fileloc
+            error_path = dag.relative_fileloc or dag.fileloc
+            serialization_import_errors[error_path] = traceback.format_exc(
                 limit=-dagbag_import_error_traceback_depth
             )
     return serialized_dags, serialization_import_errors
diff --git a/airflow-core/tests/unit/dag_processing/test_dagbag.py 
b/airflow-core/tests/unit/dag_processing/test_dagbag.py
index 2aeadde4b66..b8eaccd8e78 100644
--- a/airflow-core/tests/unit/dag_processing/test_dagbag.py
+++ b/airflow-core/tests/unit/dag_processing/test_dagbag.py
@@ -484,6 +484,70 @@ class TestDagBag:
         )
         assert dagbag.dags == dags_in_bag  # Should not change.
 
+    def test_import_errors_use_relative_path_with_bundle(self, tmp_path):
+        """Import errors should use relative paths when bundle_path is set."""
+        bundle_path = tmp_path / "bundle"
+        bundle_path.mkdir()
+        dag_path = bundle_path / "subdir" / "my_dag.py"
+        dag_path.parent.mkdir(parents=True)
+
+        dag_path.write_text("from airflow.sdk import DAG\nraise 
ImportError('test error')")
+
+        dagbag = DagBag(
+            dag_folder=os.fspath(dag_path),
+            include_examples=False,
+            bundle_path=bundle_path,
+            bundle_name="test_bundle",
+        )
+
+        expected_relative_path = "subdir/my_dag.py"
+        assert expected_relative_path in dagbag.import_errors
+        # Absolute path should NOT be a key
+        assert os.fspath(dag_path) not in dagbag.import_errors
+        assert "test error" in dagbag.import_errors[expected_relative_path]
+
+    def test_import_errors_use_relative_path_for_bagging_errors(self, 
tmp_path):
+        """Errors during DAG bagging should use relative paths when 
bundle_path is set."""
+        bundle_path = tmp_path / "bundle"
+        bundle_path.mkdir()
+
+        def create_dag():
+            from airflow.sdk import dag
+
+            @dag(schedule=None, default_args={"owner": "owner1"})
+            def my_flow():
+                pass
+
+            my_flow()
+
+        source_lines = [line[12:] for line in 
inspect.getsource(create_dag).splitlines(keepends=True)[1:]]
+        path1 = bundle_path / "testfile1.py"
+        path2 = bundle_path / "testfile2.py"
+        path1.write_text("".join(source_lines))
+        path2.write_text("".join(source_lines))
+
+        dagbag = DagBag(
+            dag_folder=os.fspath(bundle_path),
+            include_examples=False,
+            bundle_path=bundle_path,
+            bundle_name="test_bundle",
+        )
+
+        # The DAG should load successfully from one file
+        assert "my_flow" in dagbag.dags
+
+        # One file should have a duplicate DAG error - file order is not 
guaranteed
+        assert len(dagbag.import_errors) == 1
+        error_path = next(iter(dagbag.import_errors.keys()))
+
+        # The error key should be a relative path (not absolute)
+        # and of any of the two test files
+        assert error_path in ("testfile1.py", "testfile2.py")
+        # Absolute paths should NOT be keys
+        assert os.fspath(path1) not in dagbag.import_errors
+        assert os.fspath(path2) not in dagbag.import_errors
+        assert "AirflowDagDuplicatedIdException" in 
dagbag.import_errors[error_path]
+
     def test_zip_skip_log(self, caplog, test_zip_path):
         """
         test the loading of a DAG from within a zip file that skips another 
file because

Reply via email to