This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 35422f087ae Change leftover import error path record to relative path
(#60967)
35422f087ae is described below
commit 35422f087aeffb65aa0044ed4a21ee0bb11680a1
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Jan 26 18:32:53 2026 +0100
Change leftover import error path record to relative path (#60967)
* Change leftover import error path record to relative path
These too should use relative paths for import error
* fixup! Change leftover import error path record to relative path
* fixup! fixup! Change leftover import error path record to relative path
* Apply suggestion from @ephraimbuddy
---
airflow-core/src/airflow/dag_processing/dagbag.py | 13 ++---
.../src/airflow/dag_processing/processor.py | 4 +-
.../tests/unit/dag_processing/test_dagbag.py | 64 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 9 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/dagbag.py
b/airflow-core/src/airflow/dag_processing/dagbag.py
index 5c21254cf6e..649be758477 100644
--- a/airflow-core/src/airflow/dag_processing/dagbag.py
+++ b/airflow-core/src/airflow/dag_processing/dagbag.py
@@ -22,7 +22,6 @@ import os
import sys
import textwrap
import warnings
-import zipfile
from collections.abc import Generator
from datetime import datetime, timedelta
from pathlib import Path
@@ -318,12 +317,9 @@ class DagBag(LoggingMixin):
if result.errors:
for error in result.errors:
- # Use the file path from error for ZIP files (contains
zip/file.py format)
- # For regular files, use the original filepath
- if zipfile.is_zipfile(filepath):
- error_path = error.file_path if error.file_path else
filepath
- else:
- error_path = filepath
+ # Use the relative file path from error (importer provides
relative paths)
+ # Fall back to converting filepath to relative if
error.file_path is not set
+ error_path = error.file_path if error.file_path else
self._get_relative_fileloc(filepath)
error_msg = error.stacktrace if error.stacktrace else
error.message
self.import_errors[error_path] = error_msg
self.log.error("Error loading DAG from %s: %s", error_path,
error.message)
@@ -356,7 +352,8 @@ class DagBag(LoggingMixin):
self.log.debug("DAG %s skipped by cluster policy", dag.dag_id)
except Exception as e:
self.log.exception("Error bagging DAG from %s", filepath)
- self.import_errors[filepath] = f"{type(e).__name__}: {e}"
+ relative_path = self._get_relative_fileloc(filepath)
+ self.import_errors[relative_path] = f"{type(e).__name__}: {e}"
self.file_last_changed[filepath] = file_last_changed_on_disk
return bagged_dags
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index 897a07130bb..46fdfecf470 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -255,7 +255,9 @@ def _serialize_dags(
dagbag_import_error_traceback_depth = conf.getint(
"core", "dagbag_import_error_traceback_depth", fallback=None
)
- serialization_import_errors[dag.fileloc] = traceback.format_exc(
+ # Use relative_fileloc if available, fall back to fileloc
+ error_path = dag.relative_fileloc or dag.fileloc
+ serialization_import_errors[error_path] = traceback.format_exc(
limit=-dagbag_import_error_traceback_depth
)
return serialized_dags, serialization_import_errors
diff --git a/airflow-core/tests/unit/dag_processing/test_dagbag.py
b/airflow-core/tests/unit/dag_processing/test_dagbag.py
index 2aeadde4b66..b8eaccd8e78 100644
--- a/airflow-core/tests/unit/dag_processing/test_dagbag.py
+++ b/airflow-core/tests/unit/dag_processing/test_dagbag.py
@@ -484,6 +484,70 @@ class TestDagBag:
)
assert dagbag.dags == dags_in_bag # Should not change.
+ def test_import_errors_use_relative_path_with_bundle(self, tmp_path):
+ """Import errors should use relative paths when bundle_path is set."""
+ bundle_path = tmp_path / "bundle"
+ bundle_path.mkdir()
+ dag_path = bundle_path / "subdir" / "my_dag.py"
+ dag_path.parent.mkdir(parents=True)
+
+ dag_path.write_text("from airflow.sdk import DAG\nraise
ImportError('test error')")
+
+ dagbag = DagBag(
+ dag_folder=os.fspath(dag_path),
+ include_examples=False,
+ bundle_path=bundle_path,
+ bundle_name="test_bundle",
+ )
+
+ expected_relative_path = "subdir/my_dag.py"
+ assert expected_relative_path in dagbag.import_errors
+ # Absolute path should NOT be a key
+ assert os.fspath(dag_path) not in dagbag.import_errors
+ assert "test error" in dagbag.import_errors[expected_relative_path]
+
+ def test_import_errors_use_relative_path_for_bagging_errors(self,
tmp_path):
+ """Errors during DAG bagging should use relative paths when
bundle_path is set."""
+ bundle_path = tmp_path / "bundle"
+ bundle_path.mkdir()
+
+ def create_dag():
+ from airflow.sdk import dag
+
+ @dag(schedule=None, default_args={"owner": "owner1"})
+ def my_flow():
+ pass
+
+ my_flow()
+
+ source_lines = [line[12:] for line in
inspect.getsource(create_dag).splitlines(keepends=True)[1:]]
+ path1 = bundle_path / "testfile1.py"
+ path2 = bundle_path / "testfile2.py"
+ path1.write_text("".join(source_lines))
+ path2.write_text("".join(source_lines))
+
+ dagbag = DagBag(
+ dag_folder=os.fspath(bundle_path),
+ include_examples=False,
+ bundle_path=bundle_path,
+ bundle_name="test_bundle",
+ )
+
+ # The DAG should load successfully from one file
+ assert "my_flow" in dagbag.dags
+
+ # One file should have a duplicate DAG error - file order is not
guaranteed
+ assert len(dagbag.import_errors) == 1
+ error_path = next(iter(dagbag.import_errors.keys()))
+
+ # The error key should be a relative path (not absolute)
+ # and of any of the two test files
+ assert error_path in ("testfile1.py", "testfile2.py")
+ # Absolute paths should NOT be keys
+ assert os.fspath(path1) not in dagbag.import_errors
+ assert os.fspath(path2) not in dagbag.import_errors
+ assert "AirflowDagDuplicatedIdException" in
dagbag.import_errors[error_path]
+
def test_zip_skip_log(self, caplog, test_zip_path):
"""
test the loading of a DAG from within a zip file that skips another
file because