This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dabfd7ed5c Add exception class name to DAG-parsing error message
(#30105)
dabfd7ed5c is described below
commit dabfd7ed5cd2de092d6940544d3e6275f95e0b4b
Author: Changhoon Oh <[email protected]>
AuthorDate: Tue Mar 21 21:55:41 2023 +0900
Add exception class name to DAG-parsing error message (#30105)
---
airflow/models/dagbag.py | 2 +-
tests/models/test_dagbag.py | 11 +++++++----
2 files changed, 8 insertions(+), 5 deletions(-)
diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py
index 24d61040f6..8d0f22851c 100644
--- a/airflow/models/dagbag.py
+++ b/airflow/models/dagbag.py
@@ -438,7 +438,7 @@ class DagBag(LoggingMixin):
self.bag_dag(dag=dag, root_dag=dag)
except Exception as e:
self.log.exception("Failed to bag_dag: %s", dag.fileloc)
- self.import_errors[dag.fileloc] = str(e)
+ self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}"
self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
else:
found_dags.append(dag)
diff --git a/tests/models/test_dagbag.py b/tests/models/test_dagbag.py
index 3deb99ade5..8b64167374 100644
--- a/tests/models/test_dagbag.py
+++ b/tests/models/test_dagbag.py
@@ -184,7 +184,7 @@ class TestDagBag:
found_2 = dagbag.process_file(tf_2.name)
assert len(found_2) == 0
- assert dagbag.import_errors[tf_2.name].startswith("Ignoring DAG")
+ assert
dagbag.import_errors[tf_2.name].startswith("AirflowDagDuplicatedIdException:
Ignoring DAG")
assert dagbag.dags == dags_in_bag # Should not change.
def test_zip_skip_log(self, caplog):
@@ -1007,12 +1007,14 @@ class TestDagBag:
obey cluster policy.
"""
dag_file = os.path.join(TEST_DAGS_FOLDER, "test_missing_owner.py")
+ dag_id = "test_missing_owner"
+ err_cls_name = "AirflowClusterPolicyViolation"
dagbag = DagBag(dag_folder=dag_file, include_examples=False)
assert set() == set(dagbag.dag_ids)
expected_import_errors = {
dag_file: (
- f"""DAG policy violation (DAG ID: test_missing_owner, Path:
{dag_file}):\n"""
+ f"""{err_cls_name}: DAG policy violation (DAG ID: {dag_id},
Path: {dag_file}):\n"""
"""Notices:\n"""
""" * Task must have non-None non-default owner. Current
value: airflow"""
)
@@ -1027,12 +1029,14 @@ class TestDagBag:
"""
TEST_DAGS_CORRUPTED_FOLDER =
pathlib.Path(__file__).parent.with_name("dags_corrupted")
dag_file = os.path.join(TEST_DAGS_CORRUPTED_FOLDER,
"test_nonstring_owner.py")
+ dag_id = "test_nonstring_owner"
+ err_cls_name = "AirflowClusterPolicyViolation"
dagbag = DagBag(dag_folder=dag_file, include_examples=False)
assert set() == set(dagbag.dag_ids)
expected_import_errors = {
dag_file: (
- f"""DAG policy violation (DAG ID: test_nonstring_owner, Path:
{dag_file}):\n"""
+ f"""{err_cls_name}: DAG policy violation (DAG ID: {dag_id},
Path: {dag_file}):\n"""
"""Notices:\n"""
""" * owner should be a string. Current value: ['a']"""
)
@@ -1061,7 +1065,6 @@ class TestDagBag:
assert "has no tags" in dagbag.import_errors[dag_file]
def test_dagbag_dag_collection(self):
-
dagbag = DagBag(dag_folder=TEST_DAGS_FOLDER, include_examples=False,
collect_dags=False)
# since collect_dags is False, dagbag.dags should be empty
assert not dagbag.dags