ephraimbuddy commented on code in PR #60967:
URL: https://github.com/apache/airflow/pull/60967#discussion_r2720643413
##########
airflow-core/tests/unit/dag_processing/test_dagbag.py:
##########
@@ -484,6 +484,71 @@ def my_flow():
)
assert dagbag.dags == dags_in_bag # Should not change.
+ def test_import_errors_use_relative_path_with_bundle(self, tmp_path):
+ """Import errors should use relative paths when bundle_path is set."""
+ bundle_path = tmp_path / "bundle"
+ bundle_path.mkdir()
+ dag_path = bundle_path / "subdir" / "my_dag.py"
+ dag_path.parent.mkdir(parents=True)
+
+ dag_path.write_text("from airflow.sdk import DAG\nraise
ImportError('test error')")
+
+ dagbag = DagBag(
+ dag_folder=os.fspath(dag_path),
+ include_examples=False,
+ bundle_path=bundle_path,
+ bundle_name="test_bundle",
+ )
+
+ expected_relative_path = "subdir/my_dag.py"
+ assert expected_relative_path in dagbag.import_errors
+ # Absolute path should NOT be a key
+ assert os.fspath(dag_path) not in dagbag.import_errors
+ assert "test error" in dagbag.import_errors[expected_relative_path]
+
+ def test_import_errors_use_relative_path_for_bagging_errors(self,
tmp_path):
+ """Errors during DAG bagging should use relative paths when
bundle_path is set."""
+ bundle_path = tmp_path / "bundle"
+ bundle_path.mkdir()
+
+ def create_dag():
+ from airflow.sdk import dag
+
+ @dag(schedule=None, default_args={"owner": "owner1"})
+ def my_flow():
+ pass
+
+ my_flow()
+
+ source_lines = [line[12:] for line in
inspect.getsource(create_dag).splitlines(keepends=True)[1:]]
+ path1 = bundle_path / "testfile1.py"
+ path2 = bundle_path / "testfile2.py"
+ path1.write_text("".join(source_lines))
+ path2.write_text("".join(source_lines))
+
+ dagbag = DagBag(
+ dag_folder=os.fspath(bundle_path),
+ include_examples=False,
+ bundle_path=bundle_path,
+ bundle_name="test_bundle",
+ )
+
+ # The DAG should load successfully from one file
+ assert "my_flow" in dagbag.dags
+
+ # One file should have a duplicate DAG error - file order is not
guaranteed
+ assert len(dagbag.import_errors) == 1
+ error_path = next(iter(dagbag.import_errors.keys()))
+ breakpoint()
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]